# Google Bigquery Import Integration [Google BigQuery Export Integrationの詳細はこちら](/ja/int/google-bigquery-export-integration)。 Google BigQuery統合により、BigQueryテーブルまたはクエリ結果からTreasure Dataへデータをインポートできます。 インポートとエクスポートの両方で同じBigQuery接続を使用できますが、OAuthで認証された接続はエクスポートには使用できません。 ## 前提条件 - Treasure Dataの基本知識 - Google Cloud Platform(BigQuery、Cloud Storage、IAM)の基本知識 ## 制限事項 - このコネクタではOAuthはサポートされなくなりました。JSON keyfileのみがサポートされています。 - データセットが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。指定しない場合、TD内のジョブが「Cannot find job_id xxxxx」というエラーで失敗します。 - このコネクタは[外部テーブル](https://cloud.google.com/bigquery/external-data-sources)のインポートをサポートしていません。 ## 権限とロール このデータコネクタを使用するには、承認されたアカウント(サービスアカウント)が以下の権限またはIAMロールを持っている必要があります。 | **カテゴリ** | **必要な権限** | **最小限のIAMロール** | | --- | --- | --- | | **テーブルロードを使用する場合** | - bigquery.tables.get - bigquery.tables.getData | - BigQuery Data Viewer | | **クエリロードを使用する場合** | - bigquery.jobs.create | - BigQuery Job User | | **「大規模データセットのインポート」を使用する場合** | - bigquery.tables.export - bigquery.tables.delete - storage.buckets.get - storage.objects.list - storage.objects.create - storage.objects.delete - storage.objects.get | - BigQuery Data Editor - Storage Legacy Bucket Writer - Storage Legacy Object Reader | IAMの権限とロールの詳細については、Google Cloudのドキュメントを参照してください:[BigQuery](https://cloud.google.com/bigquery/docs/access-control)および[Cloud Storage](https://cloud.google.com/storage/docs/access-control/iam-roles)。 ## TD Consoleを使用する ### 新しい認証の作成 1. **Integrations Hub** > **Catalog**に移動します。 2. **Google BigQuery**を検索して選択します。 3. ダイアログが開くので、認証モードを選択します。**JSON keyfile**のみがサポートされています。![](/assets/image-20200306-213111.2f7b81cad307daf517ebc01a4f11e29bde5728083359fa0a683dce94107cdfdf.743d8bf4.png) 4. サービスアカウントキーのJSON文字列を「JSON keyfile」セクションに入力します。 5. 新しいサービスアカウントキーを作成する方法については、[Google Cloudのドキュメント](https://cloud.google.com/iam/docs/creating-managing-service-account-keys)を参照してください。 SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。 転送を作成する前に、BigQuery Web UIでクエリが有効であることを確認してください。[https://cloud.google.com/bigquery/quickstart-web-ui](https://cloud.google.com/bigquery/quickstart-web-ui) ### 新しい転送の作成 接続を作成すると、自動的にAuthenticationsタブに移動します。作成した接続を探し、Sourceを選択します。 #### Fetch from インポートするデータソースを設定します。 Google Cloud PlatformプロジェクトのIDを「Project ID」に入力します。 ### インポートタイプ インポートのタイプを選択します。テーブル全体をロードする(**table loading**)か、SQLの結果をロードする(**query loading**)かを選択できます。 - Table Loading テーブル全体をロードする場合は、「Table」を選択し、エクスポートしたい「Dataset name」と「Table name」を入力します。 マテリアライズドビューをロードする場合は、代わりに「Query Statement」を選択してください。 ![](/assets/image-20191017-210451.48450469a4e71f457a4f4e09586b3ed17e1121d210307736f1298c2424bb64f4.743d8bf4.png) - Query Loading SQLの結果をロードする場合は、「Query statement」を選択し、「SQL statement」にSQLクエリを入力します。 ![](/assets/image-20191017-210506.8e88a9fcfe00fec730507e3846793d39fd798b55f5223e42895003c7c193d07a.743d8bf4.png) デフォルトのSQL方言は[Standard SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/)です。[Legacy SQL](https://cloud.google.com/bigquery/docs/reference/legacy-sql)を使用する場合は、**Use Legacy SQL**にチェックを入れます。 デフォルトでは、このコネクタは特定の条件下で[キャッシュされた結果](https://cloud.google.com/bigquery/docs/cached-results)を使用します。キャッシュを無効にする場合は、**Use Cached Results**のチェックを外します。 ### Data Location データが米国またはEUのマルチリージョン以外の場所にある場合は、場所を指定する必要があります。 データがasia-northeast1リージョンにある場合は、場所を指定する必要があります。 場所の詳細については、[Google Cloudのドキュメント](https://cloud.google.com/bigquery/docs/dataset-locations)を参照してください。 ### Incremental Loading Incremental Loadingは、自動増分IDカラムや作成日のタイムスタンプカラムなど、増加する一意のカラムを使用して、最後の実行以降の新しいレコードのみをロードできます。 これを有効にするには、**Incremental Loading**にチェックを入れ、増分するカラム名を「Incremental Column Names」に指定します。 増分カラムとしてサポートされるのは、数値型(INTEGERおよびFLOAT)とTIMESTAMP型のみです。 ![](/assets/image-20191017-210528.65b7be0bce035dff443851f92f3dca4efe7ae4691f8841f141881024578a1ddb.743d8bf4.png) #### 仕組み このコネクタは、増分カラムで順序付けられた最新のレコードである「last record」を記録します。次回の実行時には、last recordを使用して以下のルールに従って構築されたクエリを実行してレコードをロードします: table loadingの場合、すべてのフィールドがWHERE句で選択されます。 ```sql SELECT * FROM `${dataset}.${table}` WHERE ${incremental_column} > ${value_of_last_record} ``` query loadingの場合、元のクエリがWHERE句でラップされます。 ```sql SELECT * FROM (${query}) embulk_incremental_ WHERE ${incremental_column} > ${value_of_last_record} ``` 複数の増分カラムがある場合(例えば、c1、c2、c3)、WHERE句は以下のようになります: ```sql WHERE (c1 > 1) OR (c1 = 1 AND c2 > 2) OR (c1 = 1 AND c2 = 2 AND c3 > 3) ``` ### 大規模データセットのインポート 大規模データセット(目安として500MB以上)を読み込む場合は、この「大規模データセットのインポート」オプションを使用することをお勧めします。このオプションは、データをGCS(Google Cloud Storage)オブジェクトとしてエクスポートし、複数のタスクでデータを読み込みます。そのため、読み込みが高速化されます。 このオプションを有効にするには、**Import Large Dataset**をチェックしてから、「Temp dataset」、「Temp table」、「GCS bucket」、「GCS path prefix」を指定します。「Temp dataset」は事前に手動で作成する必要があります。 ![](/assets/image-20191017-210539.75a789acbdc72b1c80e71df4f292be9b96765913ef7ffd85e218da75f9ad8cd0.743d8bf4.png) - クエリを実行する場合(クエリ読み込みまたは増分読み込みを伴うテーブル読み込み)、クエリ結果は一時的なBigQueryテーブル「temp.temp_table」にエクスポートされます。 - 次に、一時テーブルは「*gs://my-bucket/data-connector/result-[12桁の数字].jsonl.gz*」として、gzip圧縮された[JSON Lines](http://jsonlines.org/)ファイルにエクスポートされます。ファイル数は結果データのサイズによって異なります。 - 増分読み込みなしのテーブル読み込みの場合、ソーステーブルのすべてのデータがGCSに直接エクスポートされます。 - 完了後、一時テーブルとGCSオブジェクトは削除されます。 一時テーブルは、クエリ対象のテーブルと同じロケーションに配置する必要があります。詳細については、Google Cloudの[一時テーブルと永続テーブルのドキュメント](https://cloud.google.com/bigquery/docs/writing-results#temporary_and_permanent_tables)を参照してください。 また、データセットが「US」に設定されていない限り、GCSバケットもテーブルと同じロケーションに配置する必要があります。米国ベースのデータセットから別のリージョンのCloud Storageバケットにデータをエクスポートできます。詳細については、Google Cloudの[エクスポート制限のドキュメント](https://cloud.google.com/bigquery/docs/exporting-data#export_limitations)を参照してください。 ### Data Preview インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。 1. **Next** を選択します。Data Preview ページが開きます。 2. データをプレビューする場合は、**Generate Preview** を選択します。 3. データを確認します。 ### Data Placement データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。 1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。 2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。 3. オプションで、database 名を入力します。 4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。 5. オプションで、table 名を入力します。 6. データをインポートする方法を選択します。 - **Append** (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。 - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。 - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。 7. **Timestamp-based Partition Key** 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。 8. データストレージの **Timezone** を選択します。 9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。 #### 一度だけ実行 1. **Off** を選択します。 2. **Scheduling Timezone** を選択します。 3. **Create & Run Now** を選択します。 #### 定期的に繰り返す 1. **On** を選択します。 2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。 3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。 4. **Scheduling Timezone** を選択します。 5. **Create & Run Now** を選択します。 転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。 ## データ変換 BigQueryのデータ型は、次の表に示すように、対応するTreasure Dataの型に自動的に変換されます。テーブルまたはクエリ結果のスキーマにサポートされていない型を含めると、エラーが発生します。 | **BigQuery** | **Treasure Data** | | --- | --- | | STRING | string | | BYTES | *サポートされていません* | | INTEGER | long | | FLOAT | double | | NUMERIC | *サポートされていません* | | BOOLEAN | long(trueは1、falseは0) | | TIMESTAMP | string(yyyy-MM-dd HH:mm:ss.SSS) | | DATE | *サポートされていません* | | TIME | *サポートされていません* | | DATETIME | *サポートされていません* | | RECORD | string(JSONとして) | | REPEATED (PRIMITIVE or RECORD) | string(JSONとして) | ## 入力と出力の接続の使用 Data Connector(入力)とResult Output(出力)の両方に同じBigQuery接続を使用できますが、現在、OAuthで認証された接続を出力に使用することはできません。 ## クォータと制限 [BigQueryとCloud Storage](https://cloud.google.com/bigquery/quotas)のすべてのクォータと制限がGCPプロジェクトに適用されます。 ## CLIを使用したBigQuery Connectorの使用 必要に応じて、[TD Toolbelt](https://toolbelt.treasuredata.com/)を介してコネクタを使用できます。 CLIで[TD Toolbelt](https://api-docs.treasuredata.com/en/tools/cli/quickstart/)をセットアップします。 ### 設定ファイルの作成 ここでは「config.yml」と呼ばれる設定YAMLファイルを作成します。 #### 例(config.yml) ```yaml in: type: bigquery project_id: my-project auth_method: json_key json_keyfile: content: | { "type": "service_account", "project_id": "xxxxxx", ... } import_type: table dataset: my_dataset table: my_table incremental: true incremental_columns: [id] export_to_gcs: true temp_dataset: temp temp_table: temp_table gcs_bucket: my-bucket gcs_path_prefix: data-connector/result- out: type: td ``` ### GCPの認証 #### JSONキー 「auth_method: json_key」を指定し、サービスアカウントキーのJSON内容を「json_keyfile**.**content」に配置します。 ```yaml auth_method: json_key json_keyfile: content: | { "type": "service_account", "project_id": "xxxxxx", ... } ``` #### OAuth OAuth 2アプリケーションで承認されたアカウントを使用する場合は、「auth_method: oauth2」、「client_id」、「client_secret」、「refresh_token」を指定します。 ```yaml auth_method: oauth2 client_id: 000000000000-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.apps.googleusercontent.com client_secret: yyyyyyyyyyyyyyyyyyyyyyyy refresh_token: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz ``` ### インポートタイプ #### テーブル読み込み テーブル読み込みの場合、「import_type: table」、「dataset」、「table」を指定します。 #### `import_type: table` `dataset: my_dataset` `table: my_table` #### クエリ読み込み クエリ読み込みの場合、「import_type: query」と「query」を指定します ```yaml import_type: query query: |- SELECT id, first_name, last_name, created_at FROM my_dataset.my_table WHERE first_name = "Treasure" ``` オプションで「query_option」を指定できます。「use_leagacy_sql」はデフォルトで**false**、「use_query_cache」はデフォルトで**true**です。 ```yaml query: SELECT ... query_option: use_legacy_sql: false use_query_cache: true ``` ### データロケーション 必要に応じて、「**location**」でロケーションを指定できます ``` location: asia-northeast1 ``` ### 増分読み込み 増分読み込みを有効にするには、「incremental: true」と「incremental_columns」を指定します ```yaml incremental: true incremental_columns: [id] ``` ### 大規模データセットのインポート 有効にするには、「**export_to_gcs: true**」を指定し、「temp_dataset」、「temp_table」、「gcs_bucket」、「gcs_path_prefix」を追加します ``` export_to_gcs: true temp_dataset: temp temp_table: temp_table gcs_bucket: my-bucket gcs_path_prefix: data-connector/result- ``` ## (オプション) プレビュー [td connector:preview](https://api-docs.treasuredata.com/en/tools/cli/api/#td-connector-preview) コマンドを実行して、設定ファイルを検証します ``` $ td connector:preview config.yml +---------+-------------------+------------------+-------------------------------+ | id:long | first_name:string | last_name:string | created_at:timestamp | +---------+-------------------+------------------+-------------------------------+ | 1 | "Treasure" | "Data" | "2018-05-21 12:00:00.111 UTC" | +---------+-------------------+------------------+-------------------------------+ 1 row in set Update config.yml and use 'td connector:preview config.yml' to preview again. Use 'td connector:issue config.yml' to run Server-side bulk load. ``` ## 新しいコネクタセッションの作成 [td connector:create](https://api-docs.treasuredata.com/en/tools/cli/api/#td-connector-create) を実行します。 以下の例では、BigQueryコネクタによる日次インポートセッションが作成されます。 ``` $ td connector:create daily_bigquery_import \ "10 0 * * *" td_sample_db td_sample_table config.yml Name : daily_bigquery_import Cron : 10 0 * * * Timezone : UTC Delay : 0 Database : td_sample_db Table : td_sample_table Config --- in: ... ``` ### データパーティションキー コネクタセッションでは、データパーティションキーとして使用するために、結果データに少なくとも1つのタイムスタンプカラムが必要です。デフォルトでは、最初のタイムスタンプカラムがキーとして選択されます。明示的にカラムを指定する場合は、「**--time-column**」オプションを使用します。 ``` $ td connector:create --time-column created_at \ daily_bigquery_import ... ``` 結果データにタイムスタンプカラムがない場合は、以下のようにフィルタ設定を追加して「**time**」カラムを追加します。 ``` in: type: bigquery ... filters: - type: add_time from_value: mode: upload_time to_column: name: time out: type: td ```