Skip to content
Last updated

Google Bigquery Import Integration

Google BigQuery Export Integrationの詳細はこちら

Google BigQuery統合により、BigQueryテーブルまたはクエリ結果からTreasure Dataへデータをインポートできます。

インポートとエクスポートの両方で同じBigQuery接続を使用できますが、OAuthで認証された接続はエクスポートには使用できません。

前提条件

  • Treasure Dataの基本知識
  • Google Cloud Platform(BigQuery、Cloud Storage、IAM)の基本知識

制限事項

  • このコネクタではOAuthはサポートされなくなりました。JSON keyfileのみがサポートされています。
  • データセットが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。指定しない場合、TD内のジョブが「Cannot find job_id xxxxx」というエラーで失敗します。
  • このコネクタは外部テーブルのインポートをサポートしていません。

権限とロール

このデータコネクタを使用するには、承認されたアカウント(サービスアカウント)が以下の権限またはIAMロールを持っている必要があります。

カテゴリ必要な権限最小限のIAMロール
テーブルロードを使用する場合- bigquery.tables.get - bigquery.tables.getData- BigQuery Data Viewer
クエリロードを使用する場合- bigquery.jobs.create- BigQuery Job User
「大規模データセットのインポート」を使用する場合- bigquery.tables.export - bigquery.tables.delete - storage.buckets.get - storage.objects.list - storage.objects.create - storage.objects.delete - storage.objects.get- BigQuery Data Editor - Storage Legacy Bucket Writer - Storage Legacy Object Reader

IAMの権限とロールの詳細については、Google Cloudのドキュメントを参照してください:BigQueryおよびCloud Storage

TD Consoleを使用する

新しい認証の作成

  1. Integrations Hub > Catalogに移動します。
  2. Google BigQueryを検索して選択します。
  3. ダイアログが開くので、認証モードを選択します。JSON keyfileのみがサポートされています。
  4. サービスアカウントキーのJSON文字列を「JSON keyfile」セクションに入力します。
  5. 新しいサービスアカウントキーを作成する方法については、Google Cloudのドキュメントを参照してください。

SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。 転送を作成する前に、BigQuery Web UIでクエリが有効であることを確認してください。https://cloud.google.com/bigquery/quickstart-web-ui

新しい転送の作成

接続を作成すると、自動的にAuthenticationsタブに移動します。作成した接続を探し、Sourceを選択します。

Fetch from

インポートするデータソースを設定します。

Google Cloud PlatformプロジェクトのIDを「Project ID」に入力します。

インポートタイプ

インポートのタイプを選択します。テーブル全体をロードする(table loading)か、SQLの結果をロードする(query loading)かを選択できます。

  • Table Loading

テーブル全体をロードする場合は、「Table」を選択し、エクスポートしたい「Dataset name」と「Table name」を入力します。

マテリアライズドビューをロードする場合は、代わりに「Query Statement」を選択してください。

  • Query Loading

SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。

デフォルトのSQL方言はStandard SQLです。Legacy SQLを使用する場合は、Use Legacy SQLにチェックを入れます。

デフォルトでは、このコネクタは特定の条件下でキャッシュされた結果を使用します。キャッシュを無効にする場合は、Use Cached Resultsのチェックを外します。

Data Location

データが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。

データがasia-northeast1リージョンにある場合は、場所を指定する必要があります。

場所の詳細については、Google Cloudのドキュメントを参照してください。

Incremental Loading

Incremental Loadingは、自動増分IDカラムや作成日のタイムスタンプカラムなど、増加する一意のカラムを使用して、最後の実行以降の新しいレコードのみをロードできます。

これを有効にするには、Incremental Loadingにチェックを入れ、増分するカラム名を「Incremental Column Names」に指定します。 増分カラムとしてサポートされるのは、数値型(INTEGERおよびFLOAT)とTIMESTAMP型のみです。

仕組み

このコネクタは、増分カラムで順序付けられた最新のレコードである「last record」を記録します。次回の実行時には、last recordを使用して以下のルールに従って構築されたクエリを実行してレコードをロードします:

table loadingの場合、すべてのフィールドがWHERE句で選択されます。

SELECT
  *
FROM
  `${dataset}.${table}`
WHERE
  ${incremental_column} > ${value_of_last_record}

query loadingの場合、元のクエリがWHERE句でラップされます。

SELECT
  *
FROM
  (${query}) embulk_incremental_
WHERE
  ${incremental_column} > ${value_of_last_record}

複数の増分カラムがある場合(例えば、c1、c2、c3)、WHERE句は以下のようになります:

WHERE
  (c1 > 1)
OR
  (c1 = 1 AND c2 > 2)
OR
  (c1 = 1 AND c2 = 2 AND c3 > 3)

大規模データセットのインポート

大規模データセット(目安として500MB以上)を読み込む場合は、この「大規模データセットのインポート」オプションを使用することをお勧めします。このオプションは、データをGCS(Google Cloud Storage)オブジェクトとしてエクスポートし、複数のタスクでデータを読み込みます。そのため、読み込みが高速化されます。

このオプションを有効にするには、Import Large Datasetをチェックしてから、「Temp dataset」、「Temp table」、「GCS bucket」、「GCS path prefix」を指定します。「Temp dataset」は事前に手動で作成する必要があります。

  • クエリを実行する場合(クエリ読み込みまたは増分読み込みを伴うテーブル読み込み)、クエリ結果は一時的なBigQueryテーブル「temp.temp_table」にエクスポートされます。
  • 次に、一時テーブルは「gs://my-bucket/data-connector/result-[12桁の数字].jsonl.gz」として、gzip圧縮されたJSON Linesファイルにエクスポートされます。ファイル数は結果データのサイズによって異なります。
  • 増分読み込みなしのテーブル読み込みの場合、ソーステーブルのすべてのデータがGCSに直接エクスポートされます。
  • 完了後、一時テーブルとGCSオブジェクトは削除されます。

一時テーブルは、クエリ対象のテーブルと同じロケーションに配置する必要があります。詳細については、Google Cloudの一時テーブルと永続テーブルのドキュメントを参照してください。 また、データセットが「US」に設定されていない限り、GCSバケットもテーブルと同じロケーションに配置する必要があります。米国ベースのデータセットから別のリージョンのCloud Storageバケットにデータをエクスポートできます。詳細については、Google Cloudのエクスポート制限のドキュメントを参照してください。

Data Preview

インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

  1. Next を選択します。Data Preview ページが開きます。
  2. データをプレビューする場合は、Generate Preview を選択します。
  3. データを確認します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

データ変換

BigQueryのデータ型は、次の表に示すように、対応するTreasure Dataの型に自動的に変換されます。テーブルまたはクエリ結果のスキーマにサポートされていない型を含めると、エラーが発生します。

BigQueryTreasure Data
STRINGstring
BYTESサポートされていません
INTEGERlong
FLOATdouble
NUMERICサポートされていません
BOOLEANlong(trueは1、falseは0)
TIMESTAMPstring(yyyy-MM-dd HH:mm:ss.SSS)
DATEサポートされていません
TIMEサポートされていません
DATETIMEサポートされていません
RECORDstring(JSONとして)
REPEATED (PRIMITIVE or RECORD)string(JSONとして)

入力と出力の接続の使用

Data Connector(入力)とResult Output(出力)の両方に同じBigQuery接続を使用できますが、現在、OAuthで認証された接続を出力に使用することはできません。

クォータと制限

BigQueryとCloud Storageのすべてのクォータと制限がGCPプロジェクトに適用されます。

CLIを使用したBigQuery Connectorの使用

必要に応じて、TD Toolbeltを介してコネクタを使用できます。

CLIでTD Toolbeltをセットアップします。

設定ファイルの作成

ここでは「config.yml」と呼ばれる設定YAMLファイルを作成します。

例(config.yml)

in:
  type: bigquery
  project_id: my-project
  auth_method: json_key
  json_keyfile:
    content: |
      {
        "type": "service_account",
        "project_id": "xxxxxx",
        ...
       }
  import_type: table
  dataset: my_dataset
  table: my_table
  incremental: true
  incremental_columns: [id]
  export_to_gcs: true
  temp_dataset: temp
  temp_table: temp_table
  gcs_bucket: my-bucket
  gcs_path_prefix: data-connector/result-
out:
  type: td

GCPの認証

JSONキー

「auth_method: json_key」を指定し、サービスアカウントキーのJSON内容を「json_keyfile**.**content」に配置します。

auth_method: json_key
json_keyfile:
  content: |
    {
      "type": "service_account",
      "project_id": "xxxxxx",
      ...
     }

OAuth

OAuth 2アプリケーションで承認されたアカウントを使用する場合は、「auth_method: oauth2」、「client_id」、「client_secret」、「refresh_token」を指定します。

auth_method: oauth2
client_id: 000000000000-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.apps.googleusercontent.com
client_secret: yyyyyyyyyyyyyyyyyyyyyyyy
refresh_token: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz

インポートタイプ

テーブル読み込み

テーブル読み込みの場合、「import_type: table」、「dataset」、「table」を指定します。

import_type: table dataset: my_dataset table: my_table

クエリ読み込み

クエリ読み込みの場合、「import_type: query」と「query」を指定します

import_type: query
query: |-
  SELECT
    id, first_name, last_name, created_at
  FROM
    my_dataset.my_table
  WHERE first_name = "Treasure"

オプションで「query_option」を指定できます。「use_leagacy_sql」はデフォルトでfalse、「use_query_cache」はデフォルトでtrueです。

query: SELECT ...
query_option:
  use_legacy_sql: false
  use_query_cache: true

データロケーション

必要に応じて、「location」でロケーションを指定できます

location: asia-northeast1

増分読み込み

増分読み込みを有効にするには、「incremental: true」と「incremental_columns」を指定します

incremental: true
incremental_columns: [id]

大規模データセットのインポート

有効にするには、「export_to_gcs: true」を指定し、「temp_dataset」、「temp_table」、「gcs_bucket」、「gcs_path_prefix」を追加します

export_to_gcs: true
temp_dataset: temp
temp_table: temp_table
gcs_bucket: my-bucket
gcs_path_prefix: data-connector/result-

(オプション) プレビュー

td connector:preview コマンドを実行して、設定ファイルを検証します

$ td connector:preview config.yml
+---------+-------------------+------------------+-------------------------------+
| id:long | first_name:string | last_name:string | created_at:timestamp          |
+---------+-------------------+------------------+-------------------------------+
| 1       | "Treasure"        | "Data"           | "2018-05-21 12:00:00.111 UTC" |
+---------+-------------------+------------------+-------------------------------+
1 row in set
Update config.yml and use 'td connector:preview config.yml' to preview again.
Use 'td connector:issue config.yml' to run Server-side bulk load.

新しいコネクタセッションの作成

td connector:create を実行します。

以下の例では、BigQueryコネクタによる日次インポートセッションが作成されます。

$ td connector:create daily_bigquery_import \
    "10 0 * * *" td_sample_db td_sample_table config.yml
Name     : daily_bigquery_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  ...

データパーティションキー

コネクタセッションでは、データパーティションキーとして使用するために、結果データに少なくとも1つのタイムスタンプカラムが必要です。デフォルトでは、最初のタイムスタンプカラムがキーとして選択されます。明示的にカラムを指定する場合は、「--time-column」オプションを使用します。

$ td connector:create --time-column created_at \
    daily_bigquery_import ...

結果データにタイムスタンプカラムがない場合は、以下のようにフィルタ設定を追加して「time」カラムを追加します。

in:
  type: bigquery
  ...
filters:
- type: add_time
  from_value:
    mode: upload_time
  to_column:
    name: time
out:
  type: td