hene

hene.dev

BigQuery へのインポート時に利用する SCHEMA_FILE を生成

BigQuery へのインポート時に利用する SCHEMA_FILE を生成

RDS のデータを BigQuery にインポートしている。

今回やること

BigQuery インポート時に bq load [FLAGS] DESTINATION_TABLE SOURCE_DATA [SCHEMA] を利用する。 上記コマンドの [SCHEMA] で指定する SCHEMA_FILE を生成する。

bq load [FLAGS] DESTINATION_TABLE SOURCE_DATA [SCHEMA]

SCHEMA DESTINATION_TABLE のスキーマ。

bq コマンドライン ツール リファレンス | BigQuery | Google Cloud

SCHEMA_FILE を生成

MySQL Server に接続する場合

$ ./generate_bigquery_schemas.sh "mysql -u ${USERNAME} -p${DB_PASSWORD}" $DATASET_NAME

Amazon Aurora MySQL に接続する場合

$ ./generate_bigquery_schemas.sh "mysql -u ${USERNAME} -p${DB_PASSWORD} -h ${RDS_HOST_NAME} -P ${DB_PORT}" $DATASET_NAME

生成した SCHEMA_FILE

articles.json

[
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "title",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "created_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "updated_at",
    "type": "TIMESTAMP"
  }
]

実装

MySQL で管理している全テーブルの名前とプライマリーキーを CSV に出力 - hene.dev で生成した、./tablenames/${DATASET_NAME}_tablenames.csv を利用する。

generate_bigquery_schemas.sh

#!/bin/bash -xe

MYSQL=$1
DATASET_NAME=$2

rm -rf /tmp/schemas
mkdir -p /tmp/schemas/${DATASET_NAME}
chmod 777 /tmp/schemas/${DATASET_NAME}

mkdir -p ./schemas/${DATASET_NAME}

touch ./tablenames/${DATASET_NAME}_tablenames.csv

# インポートする DATASET_NAME の schema を追加・更新
cut -d , -f 1 ./tablenames/${DATASET_NAME}_tablenames.csv | while read table_name; do
    # /tmp/schemas/${DATASET_NAME}/${table_name}.json を生成
    cat ./sql/generate_bigquery_schema.template.sql | \
        DATASET_NAME=$DATASET_NAME \
        TABLE_NAME=$table_name envsubst | \
        $MYSQL -B -r -N > "/tmp/schemas/${DATASET_NAME}/${table_name}.json"

    # /tmp/schemas/${DATASET_NAME}/${table_name}.json を整形して、./schemas/${DATASET_NAME}/${table_name}.json に反映
    cat /tmp/schemas/${DATASET_NAME}/${table_name}.json | jq . > ./schemas/${DATASET_NAME}/${table_name}.json
done

generate_bigquery_schema.template.sql

SET SESSION group_concat_max_len = 1000000;

SELECT
  CONCAT(
    '['
    , GROUP_CONCAT(
        JSON_OBJECT(
          -- mode
          'mode', 'NULLABLE'
          -- name
          , 'name', COLUMN_NAME
          -- type
          , 'type'
          , CASE
              -- WHEN COLUMN_TYPE = 'tinyint(1)' THEN 'BOOLEAN'
              -- * Rails の BOOLEAN は、INTEGER として扱う
              --   * (Rails の BOOLEAN) == (COLUMN_TYPE = 'tinyint(1)' => 0, 1 のみ) だが、MySQL の tinyint(1) には、0~255 の値を入れられる
              WHEN DATA_TYPE IN ('char', 'varchar', 'tinytext', 'text', 'mediumtext', 'longtext', 'blob', 'mediumblob', 'longblob', 'enum', 'set') THEN 'STRING'
              -- 'BYTES' => 利用していない
              WHEN DATA_TYPE IN ('tinyint', 'smallint', 'int', 'bigint') THEN 'INTEGER'
              WHEN DATA_TYPE IN ('float', 'double') THEN 'FLOAT64'
              WHEN DATA_TYPE IN ('decimal') THEN 'NUMERIC'
              -- 'BIGNUMERIC' => 利用していない('NUMERIC' で対応できるはず)
              WHEN DATA_TYPE IN ('datetime', 'timestamp') THEN 'TIMESTAMP'
              -- * 'DATETIME' => DATETIME と TIMESTAMP があると、比較しづらいため TIMESTAMP でインポート
              WHEN DATA_TYPE IN ('date') THEN 'DATE'
              WHEN DATA_TYPE IN ('time') THEN 'TIME'
              -- 'GEOGRAPHY' => 利用していない
              -- 'RECORD' => 利用していない
              -- MEMO: 該当の DATA_TYPE がなければ、インポート時にエラーを発生させる
              ELSE '========== NONE =========='
            END
        )
        ORDER BY
          ORDINAL_POSITION ASC
      )
    , ']'
  )
FROM INFORMATION_SCHEMA.COLUMNS
WHERE
  TABLE_SCHEMA = '${DATASET_NAME}'
  AND TABLE_NAME = '${TABLE_NAME}';

参考

関連記事