hene

hene.dev

(Last updated on )

dbt の macro で、BigQuery の ユーザー定義関数(UDF) を管理

dbt の macro で、BigQuery の ユーザー定義関数(UDF) を管理

背景

BigQuery で、エンコードされた URL の文字列をデコードしたい。

仕様

  • dbt でテーブルを作成する
  • URL をデコードしたい
  • デコードする処理を再利用したい

実装

  • URL をデコードする macro の作成
  • ユーザー定義関数(UDF) を作成する macro の作成

URL をデコードする macro

デコードする処理を SQL で行うのは難しそうなので、JavaScriptdecodeURIComponent()ユーザー定義関数(UDF) で参照する。 この処理を再利用できるように、dbtmacro で簡単に呼び出せるようにした。

macros/udf/udf_decode_uri_component.sql

引数で create_udftrue を渡したとき、ユーザー定義関数(UDF) を作成する。 decodeURIComponent() - JavaScript | MDN を利用して、URL をデコード。

create_udf の引数を渡さなかったときは、作成済みの ユーザー定義関数(UDF) を呼び出す。

#standardSQL

{% macro udf_decode_uri_component(uri, create_udf=false) -%}
  {% if create_udf -%}
    create or replace function `{{ target.project }}`.`{{ target.dataset }}_udfs`.`udf_decode_uri_component`(uri string)
    returns string
    language js as r"""
        try {
          return decodeURIComponent(uri);
        } catch(e) {
          return e;
        }
    """;
  {%- else -%}
    nullif(
      `{{ target.project }}`.`{{ target.dataset }}_udfs`.`udf_decode_uri_component`({{ uri }})
      , 'null'
    )
  {%- endif %}
{%- endmacro %}

macros/udf/udf_decode_uri_component.yml

udf_decode_uri_component のドキュメントを記載。

version: 2

macros:
  - name: udf_decode_uri_component
    description: >
      {{ doc('udf_decode_uri_component_description') }}
    arguments:
      - name: uri
        type: uri
        description: エンコードされた URI の構成要素

      - name: create_udf
        type: boolean
        description: dbt run-operation run_operation_create_udfs で、udf_decode_uri_component を作るか?

macros/udf/_udf.md

{% docs udf_decode_uri_component_description %}

# URI (Uniform Resource Identifier; 統一資源識別子) の構成要素をデコード

## 参照している関数

- [decodeURIComponent() - JavaScript | MDN](https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Global_Objects/decodeURIComponent)

{% enddocs %}

ユーザー定義関数(UDF) を作成する macro

dbt run-operationユーザー定義関数(UDF) を作成する macro を作成する。

dbt_project.yml

varsudfs に作成したい ユーザー定義関数(UDF)vars で管理する。

vars:
  # MEMO: `macros/udf/*` で管理している `ユーザー定義関数(UDF)` を `$ dbt run-operation run_operation_create_udfs` で作成
  udfs:
    - 'udf_decode_uri_component'

macros/run_operation/run_operation_create_udfs.sql

create_udf_dataset_query で、ユーザー定義関数(UDF) を管理するデータセットを作成。 作成したデータセットの中に、var('udfs') で指定した ユーザー定義関数(UDF) を作成する。

#standardSQL

{% macro run_operation_create_udfs() -%}
  -- データセットを作成
  {% set create_udf_dataset_query -%}
    create schema if not exists `{{ target.project }}`.`{{ target.dataset }}_udfs`
    options (
      description = 'udf を管理するデータセット',
      location = '{{ target.location }}'
    );
  {%- endset %}
  {%- do run_query(create_udf_dataset_query) -%}

  -- ユーザー定義関数(UDF) を作成
  {% for udf in var('udfs') -%}
    {%- do run_query(context[udf](create_udf=true)) -%}
  {% endfor %}s
{%- endmacro %}

jinja2 - How to dynamically call dbt macros using jinja? - Stack Overflow で、context[macro_name](params) のように macro を動的に呼び出せることを知りました。

macros/run_operation/run_operation_create_udfs.yml

run_operation_create_udfs のドキュメントを記載。

version: 2

macros:
  - name: run_operation_create_udfs
    description: >
      {{ doc('run_operation_create_udfs_description') }}

macros/run_operation/_run_operation.md

{% docs run_operation_create_udfs_description %}

# macros/udf/* で管理している udf を `$ dbt run-operation run_operation_create_udfs` で作成

## 運用方法

下記を実行して、`var('udfs')` で指定した `ユーザー定義関数(UDF)` を作成する。

```shell
$ dbt run-operation run_operation_create_udfs
```

{% enddocs %}

利用フロー

  • ユーザー定義関数(UDF) を作成
  • ユーザー定義関数(UDF) を参照したクエリを実行

ユーザー定義関数(UDF) を作成

$ dbt run-operation run_operation_create_udfs
HH:MM:SS  Running with dbt=1.5.3
HH:MM:SS  Unable to do partial parsing because a project config has changed
HH:MM:SS  Found 999 models, 9999 tests, 999 snapshots, 999 analyses, 999 macros, 0 operations, 99 seed files, 999 sources, 99 exposures, 99 metrics, 0 groups

ユーザー定義関数(UDF) を参照したクエリを実行

下記の decode_test.sql を実行して、

#standardSQL

with records as (
  select
    'https://example.com/%41' as valid_uri
    , 'https://example.com/%4' as invalid_uri
)

select
  valid_uri
  , {{ udf_decode_uri_component('valid_uri') }} as decode_valid_uri
  , invalid_uri
  , {{ udf_decode_uri_component('invalid_uri') }} as decode_invalid_uri
from records

テーブルを作成する。

$ dbt run -s decode_test
HH:MM:SS  Running with dbt=1.5.3
HH:MM:SS  Found 999 models, 9999 tests, 999 snapshots, 999 analyses, 999 macros, 0 operations, 99 seed files, 999 sources, 99 exposures, 99 metrics, 0 groups
HH:MM:SS
HH:MM:SS  Concurrency: 4 threads (target='dev')
HH:MM:SS
HH:MM:SS  1 of 1 START sql table model dataset_id.decode_test ........ [RUN]
HH:MM:SS  1 of 1 OK created sql table model dataset_id.decode_test ... [SCRIPT (0 processed) in 4.30s]
HH:MM:SS
HH:MM:SS  Finished running 1 table model in 0 hours 0 minutes and 12.99 seconds (12.99s).
HH:MM:SS
HH:MM:SS  Completed successfully
HH:MM:SS
HH:MM:SS  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

実行結果。

実行結果

他に試した方法

set_sql_header で、一時的な ユーザー定義関数(UDF) を作成。

#standardSQL

{% macro udf_decode_uri_component(uri) -%}
  {% call set_sql_header(config) %}
    create temp function udf_decode_uri_component(uri string)
    returns string
    language js as r"""
        try {
          return decodeURIComponent(uri);
        } catch(e) {
          return e;
        }
    """;
  {%- endcall %}

  nullif(
    udf_decode_uri_component({{ uri }})
    , 'null'
  )
{%- endmacro %}

$ dbt run を実行してみると、table だと問題なく実行できる。

$ dbt run -s decode_test
HH:MM:SS  Running with dbt=1.5.3
HH:MM:SS  Found 999 models, 9999 tests, 999 snapshots, 999 analyses, 999 macros, 0 operations, 99 seed files, 999 sources, 99 exposures, 99 metrics, 0 groups
HH:MM:SS
HH:MM:SS  Concurrency: 4 threads (target='dev')
HH:MM:SS
HH:MM:SS  1 of 1 START sql table model dataset_id.decode_test ........ [RUN]
HH:MM:SS  1 of 1 OK created sql table model dataset_id.decode_test ... [CREATE TABLE (1.0 rows, 0 processed) in 3.40s]
HH:MM:SS
HH:MM:SS  Finished running 1 table model in 0 hours 0 minutes and 10.30 seconds (10.30s).
HH:MM:SS
HH:MM:SS  Completed successfully
HH:MM:SS
HH:MM:SS  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

view だと

#standardSQL

{{
  config(
    materialized='view'
  )
}}

-- 略

Creating views with temporary user-defined functions is not supported エラーが発生する。

$ dbt run -s decode_test
HH:MM:SS  Running with dbt=1.5.3
HH:MM:SS  Found 999 models, 9999 tests, 999 snapshots, 999 analyses, 999 macros, 0 operations, 99 seed files, 999 sources, 99 exposures, 99 metrics, 0 groups
HH:MM:SS
HH:MM:SS  Concurrency: 4 threads (target='dev')
HH:MM:SS
HH:MM:SS  1 of 1 START sql view model dataset_id.decode_test ......... [RUN]
HH:MM:SS  BigQuery adapter: https://console.cloud.google.com/bigquery?project=project_id&j=bq:asia-northeast1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx&page=queryresults
HH:MM:SS  1 of 1 ERROR creating sql view model dataset_id.decode_test  [ERROR in 3.30s]
HH:MM:SS
HH:MM:SS  Finished running 1 view model in 0 hours 0 minutes and 10.50 seconds (10.50s).
HH:MM:SS
HH:MM:SS  Completed with 1 error and 0 warnings:
HH:MM:SS
HH:MM:SS  Database Error in model decode_test (models/tests/decode_test.sql)
HH:MM:SS    Creating views with temporary user-defined functions is not supported
HH:MM:SS    compiled Code at target/run/dbt_project_name/models/tests/decode_test.sql
HH:MM:SS
HH:MM:SS  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

上記を試したときに、table を作って、view を作成しようとすると既にテーブルが存在するのでエラーになった。 $ dbt run -s decode_test --full-refresh するか、BigQuery 側でテーブルを削除すれば実行できる。

$ dbt run -s decode_test
HH:MM:SS  Running with dbt=1.5.3
HH:MM:SS  Found 999 models, 9999 tests, 999 snapshots, 999 analyses, 999 macros, 0 operations, 99 seed files, 999 sources, 99 exposures, 99 metrics, 0 groups
HH:MM:SS
HH:MM:SS  Concurrency: 4 threads (target='dev')
HH:MM:SS
HH:MM:SS  1 of 1 START sql view model dataset_id.decode_test ......... [RUN]
HH:MM:SS  1 of 1 ERROR creating sql view model dataset_id.decode_test  [ERROR in 0.31s]
HH:MM:SS
HH:MM:SS  Finished running 1 view model in 0 hours 0 minutes and 7.84 seconds (7.84s).
HH:MM:SS
HH:MM:SS  Completed with 1 error and 0 warnings:
HH:MM:SS
HH:MM:SS  Compilation Error in model decode_test (models/tests/decode_test.sql)
HH:MM:SS    Trying to create view `project_id`.`dataset_id`.`decode_test`, but it currently exists as a table. Either drop `project_id`.`dataset_id`.`decode_test` manually, or run dbt with `--full-refresh` and dbt will drop it for you.
HH:MM:SS
HH:MM:SS    > in macro bigquery__handle_existing_table (macros/materializations/view.sql)
HH:MM:SS    > called by macro handle_existing_table (macros/materializations/models/view/helpers.sql)
HH:MM:SS    > called by macro create_or_replace_view (macros/materializations/models/view/create_or_replace_view.sql)
HH:MM:SS    > called by macro materialization_view_bigquery (macros/materializations/view.sql)
HH:MM:SS    > called by model decode_test (models/tests/decode_test.sql)
HH:MM:SS
HH:MM:SS  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

まとめ

dbtmacro で、ユーザー定義関数(UDF) を管理する事ができた。

参考

関連記事