Google BigQuery Export Integrationの詳細はこちら。
Google BigQuery統合により、BigQueryテーブルまたはクエリ結果からTreasure Dataへデータをインポートできます。
インポートとエクスポートの両方で同じBigQuery接続を使用できますが、OAuthで認証された接続はエクスポートには使用できません。
- Treasure Dataの基本知識
- Google Cloud Platform(BigQuery、Cloud Storage、IAM)の基本知識
- このコネクタではOAuthはサポートされなくなりました。JSON keyfileのみがサポートされています。
- データセットが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。指定しない場合、TD内のジョブが「Cannot find job_id xxxxx」というエラーで失敗します。
- このコネクタは外部テーブルのインポートをサポートしていません。
このデータコネクタを使用するには、承認されたアカウント(サービスアカウント)が以下の権限またはIAMロールを持っている必要があります。
| カテゴリ | 必要な権限 | 最小限のIAMロール |
|---|---|---|
| テーブルロードを使用する場合 | - bigquery.tables.get - bigquery.tables.getData | - BigQuery Data Viewer |
| クエリロードを使用する場合 | - bigquery.jobs.create | - BigQuery Job User |
| 「大規模データセットのインポート」を使用する場合 | - bigquery.tables.export - bigquery.tables.delete - storage.buckets.get - storage.objects.list - storage.objects.create - storage.objects.delete - storage.objects.get | - BigQuery Data Editor - Storage Legacy Bucket Writer - Storage Legacy Object Reader |
IAMの権限とロールの詳細については、Google Cloudのドキュメントを参照してください:BigQueryおよびCloud Storage。
- Integrations Hub > Catalogに移動します。
- Google BigQueryを検索して選択します。
- ダイアログが開くので、認証モードを選択します。JSON keyfileのみがサポートされています。

- サービスアカウントキーのJSON文字列を「JSON keyfile」セクションに入力します。
- 新しいサービスアカウントキーを作成する方法については、Google Cloudのドキュメントを参照してください。
SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。 転送を作成する前に、BigQuery Web UIでクエリが有効であることを確認してください。https://cloud.google.com/bigquery/quickstart-web-ui
接続を作成すると、自動的にAuthenticationsタブに移動します。作成した接続を探し、Sourceを選択します。
インポートするデータソースを設定します。
Google Cloud PlatformプロジェクトのIDを「Project ID」に入力します。
インポートのタイプを選択します。テーブル全体をロードする(table loading)か、SQLの結果をロードする(query loading)かを選択できます。
- Table Loading
テーブル全体をロードする場合は、「Table」を選択し、エクスポートしたい「Dataset name」と「Table name」を入力します。
マテリアライズドビューをロードする場合は、代わりに「Query Statement」を選択してください。

- Query Loading
SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。

デフォルトのSQL方言はStandard SQLです。Legacy SQLを使用する場合は、Use Legacy SQLにチェックを入れます。
デフォルトでは、このコネクタは特定の条件下でキャッシュされた結果を使用します。キャッシュを無効にする場合は、Use Cached Resultsのチェックを外します。
データが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。
データがasia-northeast1リージョンにある場合は、場所を指定する必要があります。
場所の詳細については、Google Cloudのドキュメントを参照してください。
Incremental Loadingは、自動増分IDカラムや作成日のタイムスタンプカラムなど、増加する一意のカラムを使用して、最後の実行以降の新しいレコードのみをロードできます。
これを有効にするには、Incremental Loadingにチェックを入れ、増分するカラム名を「Incremental Column Names」に指定します。 増分カラムとしてサポートされるのは、数値型(INTEGERおよびFLOAT)とTIMESTAMP型のみです。

このコネクタは、増分カラムで順序付けられた最新のレコードである「last record」を記録します。次回の実行時には、last recordを使用して以下のルールに従って構築されたクエリを実行してレコードをロードします:
table loadingの場合、すべてのフィールドがWHERE句で選択されます。
SELECT
*
FROM
`${dataset}.${table}`
WHERE
${incremental_column} > ${value_of_last_record}query loadingの場合、元のクエリがWHERE句でラップされます。
SELECT
*
FROM
(${query}) embulk_incremental_
WHERE
${incremental_column} > ${value_of_last_record}複数の増分カラムがある場合(例えば、c1、c2、c3)、WHERE句は以下のようになります:
WHERE
(c1 > 1)
OR
(c1 = 1 AND c2 > 2)
OR
(c1 = 1 AND c2 = 2 AND c3 > 3)大規模データセット(目安として500MB以上)を読み込む場合は、この「大規模データセットのインポート」オプションを使用することをお勧めします。このオプションは、データをGCS(Google Cloud Storage)オブジェクトとしてエクスポートし、複数のタスクでデータを読み込みます。そのため、読み込みが高速化されます。
このオプションを有効にするには、Import Large Datasetをチェックしてから、「Temp dataset」、「Temp table」、「GCS bucket」、「GCS path prefix」を指定します。「Temp dataset」は事前に手動で作成する必要があります。

- クエリを実行する場合(クエリ読み込みまたは増分読み込みを伴うテーブル読み込み)、クエリ結果は一時的なBigQueryテーブル「temp.temp_table」にエクスポートされます。
- 次に、一時テーブルは「gs://my-bucket/data-connector/result-[12桁の数字].jsonl.gz」として、gzip圧縮されたJSON Linesファイルにエクスポートされます。ファイル数は結果データのサイズによって異なります。
- 増分読み込みなしのテーブル読み込みの場合、ソーステーブルのすべてのデータがGCSに直接エクスポートされます。
- 完了後、一時テーブルとGCSオブジェクトは削除されます。
一時テーブルは、クエリ対象のテーブルと同じロケーションに配置する必要があります。詳細については、Google Cloudの一時テーブルと永続テーブルのドキュメントを参照してください。 また、データセットが「US」に設定されていない限り、GCSバケットもテーブルと同じロケーションに配置する必要があります。米国ベースのデータセットから別のリージョンのCloud Storageバケットにデータをエクスポートできます。詳細については、Google Cloudのエクスポート制限のドキュメントを参照してください。
インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。
- Next を選択します。Data Preview ページが開きます。
- データをプレビューする場合は、Generate Preview を選択します。
- データを確認します。
データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。
Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。
Database を選択 > Select an existing または Create New Database を選択します。
オプションで、database 名を入力します。
Table を選択 > Select an existing または Create New Table を選択します。
オプションで、table 名を入力します。
データをインポートする方法を選択します。
- Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
- Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
- Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。
データストレージの Timezone を選択します。
Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。
- Off を選択します。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
- On を選択します。
- Schedule を選択します。UI では、@hourly、@daily、@monthly、またはカスタム cron の 4 つのオプションが提供されます。
- Delay Transfer を選択して、実行時間の遅延を追加することもできます。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。
BigQueryのデータ型は、次の表に示すように、対応するTreasure Dataの型に自動的に変換されます。テーブルまたはクエリ結果のスキーマにサポートされていない型を含めると、エラーが発生します。
| BigQuery | Treasure Data |
|---|---|
| STRING | string |
| BYTES | サポートされていません |
| INTEGER | long |
| FLOAT | double |
| NUMERIC | サポートされていません |
| BOOLEAN | long(trueは1、falseは0) |
| TIMESTAMP | string(yyyy-MM-dd HH:mm:ss.SSS) |
| DATE | サポートされていません |
| TIME | サポートされていません |
| DATETIME | サポートされていません |
| RECORD | string(JSONとして) |
| REPEATED (PRIMITIVE or RECORD) | string(JSONとして) |
Data Connector(入力)とResult Output(出力)の両方に同じBigQuery接続を使用できますが、現在、OAuthで認証された接続を出力に使用することはできません。
BigQueryとCloud Storageのすべてのクォータと制限がGCPプロジェクトに適用されます。
必要に応じて、TD Toolbeltを介してコネクタを使用できます。
CLIでTD Toolbeltをセットアップします。
ここでは「config.yml」と呼ばれる設定YAMLファイルを作成します。
in:
type: bigquery
project_id: my-project
auth_method: json_key
json_keyfile:
content: |
{
"type": "service_account",
"project_id": "xxxxxx",
...
}
import_type: table
dataset: my_dataset
table: my_table
incremental: true
incremental_columns: [id]
export_to_gcs: true
temp_dataset: temp
temp_table: temp_table
gcs_bucket: my-bucket
gcs_path_prefix: data-connector/result-
out:
type: td「auth_method: json_key」を指定し、サービスアカウントキーのJSON内容を「json_keyfile**.**content」に配置します。
auth_method: json_key
json_keyfile:
content: |
{
"type": "service_account",
"project_id": "xxxxxx",
...
}OAuth 2アプリケーションで承認されたアカウントを使用する場合は、「auth_method: oauth2」、「client_id」、「client_secret」、「refresh_token」を指定します。
auth_method: oauth2
client_id: 000000000000-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.apps.googleusercontent.com
client_secret: yyyyyyyyyyyyyyyyyyyyyyyy
refresh_token: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzテーブル読み込みの場合、「import_type: table」、「dataset」、「table」を指定します。
クエリ読み込みの場合、「import_type: query」と「query」を指定します
import_type: query
query: |-
SELECT
id, first_name, last_name, created_at
FROM
my_dataset.my_table
WHERE first_name = "Treasure"オプションで「query_option」を指定できます。「use_leagacy_sql」はデフォルトでfalse、「use_query_cache」はデフォルトでtrueです。
query: SELECT ...
query_option:
use_legacy_sql: false
use_query_cache: true必要に応じて、「location」でロケーションを指定できます
location: asia-northeast1増分読み込みを有効にするには、「incremental: true」と「incremental_columns」を指定します
incremental: true
incremental_columns: [id]有効にするには、「export_to_gcs: true」を指定し、「temp_dataset」、「temp_table」、「gcs_bucket」、「gcs_path_prefix」を追加します
export_to_gcs: true
temp_dataset: temp
temp_table: temp_table
gcs_bucket: my-bucket
gcs_path_prefix: data-connector/result-td connector:preview コマンドを実行して、設定ファイルを検証します
$ td connector:preview config.yml
+---------+-------------------+------------------+-------------------------------+
| id:long | first_name:string | last_name:string | created_at:timestamp |
+---------+-------------------+------------------+-------------------------------+
| 1 | "Treasure" | "Data" | "2018-05-21 12:00:00.111 UTC" |
+---------+-------------------+------------------+-------------------------------+
1 row in set
Update config.yml and use 'td connector:preview config.yml' to preview again.
Use 'td connector:issue config.yml' to run Server-side bulk load.td connector:create を実行します。
以下の例では、BigQueryコネクタによる日次インポートセッションが作成されます。
$ td connector:create daily_bigquery_import \
"10 0 * * *" td_sample_db td_sample_table config.yml
Name : daily_bigquery_import
Cron : 10 0 * * *
Timezone : UTC
Delay : 0
Database : td_sample_db
Table : td_sample_table
Config
---
in:
...コネクタセッションでは、データパーティションキーとして使用するために、結果データに少なくとも1つのタイムスタンプカラムが必要です。デフォルトでは、最初のタイムスタンプカラムがキーとして選択されます。明示的にカラムを指定する場合は、「--time-column」オプションを使用します。
$ td connector:create --time-column created_at \
daily_bigquery_import ...結果データにタイムスタンプカラムがない場合は、以下のようにフィルタ設定を追加して「time」カラムを追加します。
in:
type: bigquery
...
filters:
- type: add_time
from_value:
mode: upload_time
to_column:
name: time
out:
type: td