Skip to content
Last updated

Databricks Import Integration

このデータコネクタを使用すると、Databricksから Treasure Data にデータをインポートできます。

前提条件

  • Treasure Data の基本的な知識とTreasure Data アカウントへのアクセス権
  • Databricks の基本的な知識とDatabricks アカウントへのアクセス権
  • Databricks サーバーへのアクセス権

要件と制限事項

  • Databricks サーバーが休止モードの場合、起動に数分かかることがあり、コネクタがタイムアウトする可能性があります。

TD Console 経由での Databricks からのインポート

Databricks コネクタのホスト名とHTTP パスの取得

  1. Databricks の Web サイトにログインします。
  2. SQL タブに移動し、SQL Warehouses を選択します。

3. SQL Warehouse のインスタンスを選択します。

4. Connection details タブを選択します。 5. SQL Warehouse のServer hostname とHTTP path をメモします。

認証の作成

最初のステップは、認証情報のセットを使用して新しい認証を作成することです。

  1. Integrations Hub > Catalog を選択します。

  1. カタログで databricks を検索します。
  2. アイコンの上にマウスを移動し、Create Authentication を選択します。

  1. Credentials タブが選択されていることを確認し、統合の認証情報を入力します。

パラメータ説明
HostnameDatabricks インスタンスのホスト名です。
HTTP PathDatabricks インスタンスのHTTP パスです。
Authentication Method認証方法です。Basic または Personal Access Token を選択できます。
Usernameログインに使用するユーザー名です。Authentication Method が Basic の場合にのみ有効です。
Passwordログインに使用するパスワードです。Authentication Method が Basic の場合にのみ有効です。
TokenDatabricks パーソナルアクセストークンです。Authentication Method が Personal Access Token の場合にのみ有効です。
  1. Continue を選択します。
  2. 認証の名前を入力し、Done を選択します。

ソースの作成

  1. TD Console を開きます。
  2. Integrations Hub > Authentications に移動します。
  3. 新しい認証を見つけて、New Source を選択します。

接続の作成

  1. Data Transfer Name フィールドにソース名を入力します。
  2. Next を選択します。
パラメータ説明
Data Transfer Nameデータ転送ソースの名前です。
Authentication使用する認証の名前です。

Source Table タブが選択された状態で Create Source ページが表示されます。

ソーステーブルの識別

パラメータ説明
Catalogデータセットカタログです。
Schemaデータセットスキーマです。
Use custom SELECT query?カスタムクエリを構築する場合はチェックボックスを選択し、クエリを作成する場合はフィールドに入力します。
Skip validate query?クエリの構文検証をスキップします
Select Columnsデータセットから選択する列を定義します。screen-shotUse custom SELECT queryscreen-shot が選択されていない場合にのみ必要です。
Table選択するテーブルです。screen-shotUse custom SELECT queryscreen-shot が選択されていない場合にのみ必要です。
Filter Conditionsテーブルから取得するデータに追加の特定性が必要な場合は、WHERE 句の一部として指定できます。
Order by特定のフィールドでレコードを並べ替える必要がある場合に指定します。増分列は order by 条件に含める必要があります。
QuerySQL クエリを含むフィールドです。screen-shotUse custom SELECT queryscreen-shot が選択されている場合にのみ必要です。
Incremental Loading増分読み込みを有効にする場合はチェックボックスを選択します
Incremental Columns増分列を定義します。これらの列は order by フィルタに含める必要があります。
Start after value増分読み込みに使用される列の値です。
Invalid Value Handling Mode無効な値の処理方法を指定します。次のいずれかを選択できます。 - Fail Job - Ignore Invalid Value - Ignore Row

Next を選択します。

カスタムクエリの作成

  • シンプルなクエリ:
query: SELECT * FROM simple_data WHERE cut = 'Very Good' ORDER BY `c_custkey`
  • 増分とlast_records を使用したクエリ

incremental\_columns の値は、パラメータの位置に従い、列と同じ名前にする必要があります。

query: SELECT * FROM simple_data WHERE myid > :myid AND count > :count ORDER BY `myid`
incremental: true
incremental_columns: ['myid', 'count']
last_record: ['4', '5']

クエリには、last\_record に値が入力されたWHERE 条件が必要です。

Order by は必須ではありませんが、より正確な結果を得るために指定する必要があります。

screen-shotmyidscreen-shot が4より大きく、screen-shotcountscreen-shot が5より大きい実際のクエリは、次のようになります。

SELECT * FROM simple_data WHERE myid > 4 AND count > 5 ORDER BY `myid`

データ設定の定義

パラメータ説明
COLUMN OPTIONS特定のカラムのデータ型を定義します。
DEFAULT COLUMN_OPTIONS特定のデータ型の変換データ型を定義します。例: longを常にstringに変換する

Next を選択します。

データのプレビュー

データプレビューはオプションであり、次のダイアログページに進む場合は Next をクリックしても問題ありません。

  1. インポートを実行する前にデータのプレビューを表示するには、Generate Preview を選択します。

データプレビューに表示されるデータは、ソースから概算されたものです。これはインポートされる実際のデータではありません。

  1. データが期待通りに表示されていることを確認します。

3. Next を選択します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

Workflowを使用したDatabricksからのインポート

Workflowの screen-shottd_load>:screen-shot オペレーターを使用して、Databricksからデータをインポートできます。既にSOURCEを作成している場合は実行できます。SOURCEを作成したくない場合は、screen-shot.ymlscreen-shot ファイルを使用してインポートできます。

Sourceの使用

  1. TD Consoleを開きます。
  2. Integrations Hub > Sourcesに移動します。
  3. 対象のSourceを特定します。
  4. Sourceの右端にあるmore icon (...)を選択し、Copy Unique IDを選択します。

Unique IDは次のようになります: screen-shotdatabricks_import_1234••••••screen-shot

  1. screen-shottd_load>:screen-shot オペレーターを使用してWorkflowタスクを定義します。
+load:
  td_load>: databricks_import_1234••••••
  database: ${td.dest_db}
  table: ${td.dest_table}
  1. Workflowを実行します。

ymlファイルの使用

  1. 対象の screen-shot.ymlscreen-shot ファイルを特定します。

screen-shot.ymlscreen-shot ファイルを作成する必要がある場合は、Amazon S3 Import Integration Using CLIを参照してください。

  1. screen-shottd_load>:screen-shot オペレーターを使用してWorkflowタスクを定義します。
+load:
  td_load>: config/daily_load.yml
  database: ${td.dest_db}
  table: ${td.dest_table}
  1. Workflowを実行します。

パラメーターリファレンス

名前説明デフォルト値必須
typedatabricksdatabricksTrue
auth_method認証方法です。basic または access_tokenbasic
host_nameDatabricksインスタンスのホスト名です。True
http_pathDatabricksインスタンスのHTTPパスです。True
user_nameログイン用のユーザー名です。True(auth_methodがbasicの場合)
passwordログイン用のパスワードです。True(auth_methodがbasicの場合)
access_tokenデータ取得に使用するaccess_tokenです。True(auth_methodがaccess_tokenの場合)
catalogデータセットのカタログです。True
schemaデータセットのスキーマです。True
use_custom_queryカスタムクエリを構築するか、フィールドに入力してカスタムクエリを作成できます。true/falsetrue
selectデータセットから選択するカラムを定義します。これはscreen-shotUse custom SELECT queryscreen-shotがfalseの場合のみ必須です。*True(use_custom_queryがfalseの場合)
table選択するテーブルです。これはscreen-shotUse custom SELECT queryscreen-shot がfalseの場合のみ必須です。True(use_custom_queryがfalseの場合)
whereテーブルから取得するデータの詳細な条件が必要な場合は、WHERE句の一部として指定できます。
order_by特定のフィールドでレコードを並べ替える必要がある場合は指定します。増分カラムはorder by条件に含める必要があります。
query独自のクエリを構築します。True(use_custom_queryがtrueの場合)
incremental増分読み込みを有効にします。true/falsefalse
incremental_columns特定のカラムのデータ型を定義します。
last_record増分読み込みの最後のレコード値を定義します。
default_column_options特定のデータ型の変換データ型を定義します。

サンプルWorkflowコード

サンプルWorkflowコードについては、Treasure Boxesを参照してください。

CLI (Toolbelt) を使用したDatabricksからのインポート

コネクタを設定する前に、最新のTD Toolbeltをインストールしてください。

Seed設定ファイル (seed.yml) の作成

in:
  type: databricks
  auth_method: BASIC
  host_name: "dbc-8297d0a7-2709.cloud.databricks.com"
  http_path: "/sql/1.0/warehouses/1b195ce3a8720eb9"
  username: ***
  password: ***
  access_token: ***
  catalog: hive_metastore
  schema: default
  select: "*"
  table: diamonds
  incremental_columns: [ price, x, y, z ]
  order_by: "`price`, `x`, `y`, `z`"
  incremental: true
  last_record: ['4']
  default_column_options: {"double": {"type": "string"}}
out:
  mode: append

パラメーターリファレンス

名前説明デフォルト値必須
typedatabricksdatabricksTrue
auth_method認証方法です。basic/access_tokenbasic
host_nameDatabricksインスタンスのホスト名です。True
http_pathDatabricksインスタンスのHTTPパスです。True
user_nameログインするためのユーザー名です。True(auth_methodがbasicの場合)
passwordログインするためのパスワードです。True(auth_methodがbasicの場合)
access_tokenデータ取得のためのaccess_tokenです。True(auth_methodがaccess_tokenの場合)
catalogデータセットのカタログです。True
schemaこれはデータセットのスキーマです。True
use_custom_queryカスタムクエリを作成するか、フィールドを入力してクエリを作成します。true/falsetrue
skip_validate_queryクエリの検証をスキップします。クエリの検証をスキップすると、プレビューモードは常にダミーデータで実行されます。true/falsefalse
selectデータセットから選択する列を定義します。カスタムSELECTクエリを使用がfalseの場合にのみ必須です。*use_custom_queryがfalseの場合はTrue
table選択するテーブルです。カスタムSELECTクエリを使用がfalseの場合にのみ必須です。use_custom_queryがfalseの場合はTrue
whereテーブルから取得するデータに追加の特定性が必要な場合は、WHERE句の一部として指定できます。
order_by特定のフィールドでレコードを並べ替える必要がある場合に指定します。incrementalカラムはorder by条件に含める必要があります。
query独自のクエリを作成します。use_custom_queryがtrueの場合はTrue
incrementalインクリメンタルローディングを有効にします。true/falsefalse
incremental_columns特定の列のデータ型を定義します。
last_recordインクリメンタルローディングの最終レコード値を定義します。
default_column_options特定のデータ型の変換データ型を定義します。

load.ymlの生成

connector:guessコマンドは、ソースファイルを自動的に読み取り、ロジックを使用してファイル形式、フィールド、列を推測します。

$ td connector:guess seed.yml -o load.yml

load.ymlを開いて、ファイル形式、エンコーディング、列名、型などの定義を確認できます。

in:
  type: databricks
  auth_method: BASIC
  host_name: "dbc-8297d0a7-2709.cloud.databricks.com"
  http_path: "/sql/1.0/warehouses/1b195ce3a8720eb9"
  username: ***
  password: ***
  access_token: ***
  catalog: hive_metastore
  schema: default
  select: "*"
  table: diamonds
  incremental_columns: [ price, x, y, z ]
  order_by: "`price`, `x`, `y`, `z`"
  query: SELECT * FROM simple_data WHERE myid > :myid
  incremental: true
  last_record: ['4']
  default_column_options: {"double": {"type": "string"}}
out:
  mode: append

データをプレビューするには、td connector:previewコマンドを使用します。

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

guessコマンドは、ソースデータファイル内に3行以上、2列以上が必要です。これは、コマンドがソースデータのサンプル行を使用して列定義を評価するためです。

列名または列型が予期せず検出された場合は、load.ymlファイルを変更して再度プレビューしてください。

ロードジョブの実行

ジョブの実行には、データのサイズに応じて数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを必ず指定してください。

Treasure Dataのストレージは時間によってパーティション分割されているため(データパーティショニングを参照)、--time-columnオプションを指定することをお勧めします。このオプションが指定されていない場合、データコネクタは最初のlongまたはtimestamp列をパーティショニング時間として選択します。--time-columnで指定する列の型は、longまたはtimestampのいずれかである必要があります。

データに時間列がない場合は、add_timeフィルタオプションを使用して時間列を追加できます。詳細については、add_timeフィルタプラグインを参照してください。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
  --time-column created_at

このconnector:issueのコマンド例では、すでにdatabase(td_sample_db)table(td_sample_table)を作成していることを前提としています。TDにデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成してください。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table
 --time-column created_at --auto-create-table

データコネクタは、サーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。

timeというフィールドがある場合は、--time-columnオプションを指定する必要はありません。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

インポートモード

load.ymlファイルのoutセクションでファイルのインポートモードを指定できます。*out:*セクションは、データがTreasure Dataテーブルにインポートされる方法を制御します。たとえば、データを追加したり、Treasure Dataの既存のテーブル内のデータを置き換えたりすることができます。

モード説明
Appendレコードはターゲットテーブルに追加されます。in: ... out: mode: append
Always Replaceターゲットテーブルのデータを置き換えます。ターゲットテーブルに対して行われた手動のスキーマ変更はそのまま保持されます。in: ... out: mode: replace
Replace on new dataインポートする新しいデータがある場合にのみ、ターゲットテーブルのデータを置き換えます。in: ... out: mode: replace_on_new_data

実行のスケジューリング

インクリメンタルファイルインポートのために、定期的なデータコネクタの実行をスケジュールできます。高可用性を確保するために、スケジューラを慎重に設定してください。

スケジュールされたインポートの場合、指定されたプレフィックスに一致し、条件に応じて次のいずれかのフィールドを持つすべてのファイルをインポートできます。

  • use_modified_timeが無効になっている場合、次の実行のために最後のパスが保存されます。2回目以降の実行では、コネクタはアルファベット順で最後のパスの後に来るファイルのみをインポートします。
  • それ以外の場合、ジョブが実行された時刻が次の実行のために保存されます。2回目以降の実行では、コネクタはアルファベット順でその実行時刻以降に変更されたファイルのみをインポートします。

タイムゾーンを指定せずに指定されたすべての日付および日付/時刻形式は、常にデフォルトのUTCとして扱われます。

TD Toolbeltを使用したスケジュールの作成

新しいスケジュールは、td connector:createコマンドを使用して作成できます。

$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml

Treasure Dataのストレージは時間によってパーティション分割されているため(データパーティショニングも参照)、--time-columnオプションを指定することをお勧めします。

$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml \
 --time-column created_at

cronパラメータは、@hourly@daily@monthlyの3つの特別なオプションも受け付けます。

デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは*--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PSTやCSTなどのタイムゾーンの略語はサポートされておらず*、予期しないスケジュールになる可能性があります。