This feature is in BETA version. For more information, contact your Customer Success Representative.
Delta sharingは、社内の顧客データを整理・統合し、マーケティング目的に最適なデータに変換することを支援します。このインテグレーションにより、Delta sharingデータをTreasure Dataにインポートできます。
- Treasure Dataの基本知識
- Delta Sharing serverの基本知識
- Delta Sharing server上の情報にアクセスするための十分な権限を持つユーザーIDが必要です。
- Delta Sharing metastoresのAPIエンドポイントが必要です。
- Delta Sharing serverへのAPI呼び出しを認証するためのbearer tokenが必要です。
Treasure DataのStatic IPアドレスは、このインテグレーションのアクセスポイントおよび連携元となります。Static IPアドレスを確認するには、Customer Successの担当者またはテクニカルサポートにお問い合わせください。
このインテグレーションは、recipientsを使用したDelta Sharingオープン共有を使用してDatabricksのデータを読み取ることができます。
主要なエンドポイントは以下のとおりです:
- 共有のための新しいrecipientを作成する — https://docs.databricks.com/en/delta-sharing/create-recipient.html
- recipientにshareへのアクセス権を付与する — https://docs.databricks.com/en/delta-sharing/create-recipient.html
- エンドポイントとtokenを含むアクセス情報ファイルをダウンロードする — https://docs.databricks.com/en/delta-sharing/create-recipient.html
- tokenの有効期限を延長する — https://docs.databricks.com/en/delta-sharing/create-recipient.html
最初のステップは、認証情報のセットを使用して新しい認証を作成することです。
- Integrations Hubを選択します。
- Catalogを選択します。
3. CatalogでDelta Sharingを検索し、アイコンの上にマウスを置いてCreate Authenticationを選択します。
4. Credentialsタブが選択されていることを確認し、インテグレーションの認証情報を入力します。

| パラメータ | 説明 |
|---|---|
| Endpoint | delta-sharing serverへのAPIエンドポイント |
| Bearer Token | delta-sharing server APIにアクセスするためのtoken |
- Continueを選択します。
- 認証の名前を入力し、Doneを選択します。
- TD Consoleを開きます。
- Integrations Hub > Authenticationsに移動します。
- 新しく作成した認証を見つけて、New Sourceを選択します。
Create Sourceページが表示され、Source Tableタブが選択された状態になります。
- Data Transfer Nameフィールドにソース名を入力します。
- データ転送に使用する認証の名前を入力します。
- Nextを選択します。

Create Sourceページが表示され、Source Tableタブが選択された状態になります。
- パラメータを編集します

| パラメータ | 必須 | 説明 |
|---|---|---|
| Share | はい | Share名 |
| Schema | はい | Schema名 |
| Table | はい | Table名 |
| Default Timezone | はい | タイムスタンプ形式のデフォルトのタイムゾーン |
- Nextを選択します。
Data Settingsタブが選択された状態でCreate Sourceページが表示されます。
- パラメータを編集します。

| パラメータ | 必須 | 説明 |
|---|---|---|
| Retry Limit | いいえ | リトライ回数 |
| Initial Retry Interval In Millis | いいえ | 初期リトライ間隔(ミリ秒) |
| Max Retry Wait In Millis | いいえ | 最大リトライ待機時間(ミリ秒) |
| HTTP Connect Timeout In Millis | いいえ | HTTPタイムアウト(ミリ秒) |
| HTTP Read Timeout In Millis | いいえ | HTTP読み取りタイムアウト(ミリ秒) |
| HTTP Write Timeout In Millis | いいえ | HTTP書き込みタイムアウト(ミリ秒) |
- Nextを選択します。
Data Previewタブが選択された状態でCreate Sourceページが表示されます。
インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。
- Next を選択します。Data Preview ページが開きます。
- データをプレビューする場合は、Generate Preview を選択します。
- データを確認します。
データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。
Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。
Database を選択 > Select an existing または Create New Database を選択します。
オプションで、database 名を入力します。
Table を選択 > Select an existing または Create New Table を選択します。
オプションで、table 名を入力します。
データをインポートする方法を選択します。
- Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
- Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
- Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。
データストレージの Timezone を選択します。
Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。
- Off を選択します。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
- On を選択します。
- Schedule を選択します。UI では、@hourly、@daily、@monthly、またはカスタム cron の 4 つのオプションが提供されます。
- Delay Transfer を選択して、実行時間の遅延を追加することもできます。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。
Workflowのtd_load>:オペレータを使用して、Delta Sharing serverからデータをインポートできます。すでにSourceを作成している場合は実行できます。Sourceを作成したくない場合は、.ymlファイルを使用してインポートできます。
- Sourceを特定します。
- 一意のIDを取得するには、Sourceリストを開き、Delta Sharingでフィルタリングします。
- メニューを開き、Copy Unique IDを選択します。
4. td_load>:オペレータを使用してworkflowタスクを定義します。
+load:
td_load>: unique_id_of_your_source
database: ${td.dest_db}
table: ${td.dest_table}- Workflowを実行します。
パラメータリファレンス
| Name | Description | Value | Default Value | Required |
|---|---|---|---|---|
| endpoint | Delta Sharing serverのエンドポイント | true | ||
| token | API連携を認証するためのBearerトークン | true | ||
| share | 接続するShare名 | true | ||
| schema | 接続するSchema名 | true | ||
| table | 接続するTable名 | true | ||
| default_timezone | タイムスタンプ形式のデフォルトのタイムゾーン | UTC | true | |
| retry_limit | 最大リトライ回数 | 6 | false | |
| retry_initial_wait_msecs | リトライの初期待機時間(ミリ秒) | 30000 | false | |
| max_retry_wait_msecs | 最大リトライ待機時間(ミリ秒) | 120000 | false | |
| connection_timeout_msecs | HTTP接続タイムアウト(ミリ秒) | 1800000 | false | |
| write_timeout_msecs | HTTP読み取り接続タイムアウト(ミリ秒) | 1800000 | false | |
| read_timeout_msecs | HTTP書き込み接続タイムアウト(ミリ秒) | 1800000 | false |
Workflowコードのサンプルについては、Treasure Boxesをご覧ください。
統合を設定する前に、最新のTD Toolbeltをインストールしてください。
in:
type: delta_sharing
endpoint: http://endpoint.com
token: ***
schema: schema
table: table
share: share
retry_limit: 7
retry_initial_wait_msecs: 30000
max_retry_wait_msecs: 120000
connection_timeout_msecs: 1800000
write_timeout_msecs: 1800000
read_timeout_msecs: 1800000
out:
mode: appendパラメータリファレンス
| 名前 | 説明 | 値 | デフォルト値 | 必須 |
|---|---|---|---|---|
| endpoint | Delta Sharingエンドポイント | true | ||
| token | API連携の認証に使用するBearer token | true | ||
| share | 接続するshare名 | true | ||
| schema | 接続するschema名 | true | ||
| table | 接続するtable名 | true | ||
| default_timezone | タイムスタンプ形式のデフォルトタイムゾーン。短縮形とフルゾーンIDの両方をサポート | UTC | true | |
| retry_limit | 最大リトライ回数 | 6 | false | |
| retry_initial_wait_msecs | リトライの初期待機時間(ミリ秒) | 30000 | false | |
| max_retry_wait_msecs | 最大リトライ待機時間(ミリ秒) | 120000 | false | |
| connection_timeout_msecs | HTTP接続タイムアウト(ミリ秒) | 1800000 | false | |
| write_timeout_msecs | HTTP読み取り接続タイムアウト(ミリ秒) | 1800000 | false | |
| read_timeout_msecs | HTTP書き込み接続タイムアウト(ミリ秒) | 1800000 | false |
Delta Sharing連携は、指定されたprefixに一致するすべてのファイルをインポートします。
例
path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz
connector:guessを使用します。このコマンドは、ソースファイルを自動的に読み取り、ロジックを使用してファイル形式とそのフィールドおよびカラムを推測します。
$ td connector:guess seed.yml -o load.ymlload.ymlを開いて、ファイル形式、エンコーディング、カラム名、型などのファイル形式定義を確認できます。
例
in:
type: delta_sharing
endpoint: http://endpoint.com
token: ***
schema: schema
table: table
share: share
retry_limit: 7
retry_initial_wait_msecs: 30000
max_retry_wait_msecs: 120000
connection_timeout_msecs: 1800000
write_timeout_msecs: 1800000
read_timeout_msecs: 1800000
out:
mode: appendデータをプレビューするには、td connector:previewコマンドを使用します。
$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id | company | customer | created_at |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. | David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+guessコマンドは、ソースデータファイルに3行以上、2列以上のデータが必要です。これは、コマンドがソースデータのサンプル行を使用して列定義にアクセスするためです。
システムが予期しない列名または列タイプを検出した場合は、load.ymlファイルを修正して再度プレビューしてください。
データのサイズによっては、ジョブの実行に数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを必ず指定してください。
Treasure Dataのストレージは時間によってパーティション分割されているため(data partitioningを参照)、--time-columnオプションの指定を推奨します。このオプションが指定されていない場合、データ統合は最初のlongまたはtimestamp列をパーティション分割時間として選択します。--time-columnで指定する列のタイプは、long型またはtimestamp型である必要があります。
データに時間列がない場合は、add_timeフィルタオプションを使用して時間列を追加できます。詳細については、add_time filter pluginを参照してください。
$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
--time-column created_atconnector:issueコマンドは、データベース(*td_sample_db)*とテーブル(td_sample_table)が既に作成されていることを前提としています。TDにデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成してください。
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
--time-column created_at --auto-create-tableデータ統合はサーバー側でレコードをソートしません。時間ベースのパーティション分割を効果的に使用するには、事前にファイル内のレコードをソートしてください。
timeという名前のフィールドがある場合は、--time-columnオプションを指定する必要はありません。
$ td connector:issue load.yml --database td_sample_db --table td_sample_tableload.ymlファイルのout:セクションでファイルインポートモードを指定できます。out:セクションは、Treasure Dataテーブルへのデータのインポート方法を制御します。たとえば、Treasure Dataの既存のテーブルにデータを追加するか、データを置き換えるかを選択できます。
| モード | 説明 | 例 |
|---|---|---|
| Append | レコードがターゲットテーブルに追加されます。 | in: ... out: mode: append |
| AlwaysReplace | ターゲットテーブル内のデータを置き換えます。ターゲットテーブルに対して手動で行われたスキーマ変更はそのまま保持されます。 | in: ... out: mode: replace |
| Replace on new data | インポートする新しいデータがある場合にのみ、ターゲットテーブル内のデータを置き換えます。 | in: ... out: mode: replace_on_new_data |
定期的なデータ統合の実行をスケジュール設定できます。
Treasure Dataは、高可用性を確保するためにスケジューラーを慎重に設定しています。
td connector:createコマンドを使用して新しいスケジュールを作成できます。
$ td connector:create daily_import "10 0 * * *" \
td_sample_db td_sample_table load.ymlTreasure Dataのストレージは時間によってパーティション分割されているため(data partitioningも参照)、--time-columnオプションの指定を推奨します。
$ td connector:create daily_import "10 0 * * *" \
td_sample_db td_sample_table load.yml \
--time-column created_atcronパラメータは、@hourly、@daily、@monthlyの3つの特殊オプションも受け入れます。
デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーン略語はサポートされておらず、予期しないスケジュールにつながる可能性があります。