Skip to content
Last updated

Delta Sharing Import Integration

This feature is in BETA version. For more information, contact your Customer Success Representative.

Delta sharingは、社内の顧客データを整理・統合し、マーケティング目的に最適なデータに変換することを支援します。このインテグレーションにより、Delta sharingデータをTreasure Dataにインポートできます。

前提条件

要件と制限事項

  • Delta Sharing server上の情報にアクセスするための十分な権限を持つユーザーIDが必要です。
  • Delta Sharing metastoresのAPIエンドポイントが必要です。
  • Delta Sharing serverへのAPI呼び出しを認証するためのbearer tokenが必要です。

Treasure DataのStatic IPアドレス

Treasure DataのStatic IPアドレスは、このインテグレーションのアクセスポイントおよび連携元となります。Static IPアドレスを確認するには、Customer Successの担当者またはテクニカルサポートにお問い合わせください。

DatabricksからEndpointと認証Tokenを取得

このインテグレーションは、recipientsを使用したDelta Sharingオープン共有を使用してDatabricksのデータを読み取ることができます。

主要なエンドポイントは以下のとおりです:

TD ConsoleからDelta Sharing Serverをインポート

認証の作成

最初のステップは、認証情報のセットを使用して新しい認証を作成することです。

  1. Integrations Hubを選択します。
  2. Catalogを選択します。

3. CatalogでDelta Sharingを検索し、アイコンの上にマウスを置いてCreate Authenticationを選択します。

4. Credentialsタブが選択されていることを確認し、インテグレーションの認証情報を入力します。

新規認証のフィールド

パラメータ説明
Endpointdelta-sharing serverへのAPIエンドポイント
Bearer Tokendelta-sharing server APIにアクセスするためのtoken
  1. Continueを選択します。
  2. 認証の名前を入力し、Doneを選択します。

Sourceの作成

  1. TD Consoleを開きます。
  2. Integrations Hub > Authenticationsに移動します。
  3. 新しく作成した認証を見つけて、New Sourceを選択します。

Create Sourceページが表示され、Source Tableタブが選択された状態になります。

接続の作成

  1. Data Transfer Nameフィールドにソース名を入力します。
  2. データ転送に使用する認証の名前を入力します。
  3. Nextを選択します。

Create Sourceページが表示され、Source Tableタブが選択された状態になります。

Sourceテーブルの指定

  1. パラメータを編集します

パラメータ必須説明
ShareはいShare名
SchemaはいSchema名
TableはいTable名
Default Timezoneはいタイムスタンプ形式のデフォルトのタイムゾーン
  1. Nextを選択します。

Data Settingsタブが選択された状態でCreate Sourceページが表示されます。

Data Settingsの指定

  1. パラメータを編集します。

パラメータ必須説明
Retry Limitいいえリトライ回数
Initial Retry Interval In Millisいいえ初期リトライ間隔(ミリ秒)
Max Retry Wait In Millisいいえ最大リトライ待機時間(ミリ秒)
HTTP Connect Timeout In MillisいいえHTTPタイムアウト(ミリ秒)
HTTP Read Timeout In MillisいいえHTTP読み取りタイムアウト(ミリ秒)
HTTP Write Timeout In MillisいいえHTTP書き込みタイムアウト(ミリ秒)
  1. Nextを選択します。

Data Previewタブが選択された状態でCreate Sourceページが表示されます。

Data Preview

インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

  1. Next を選択します。Data Preview ページが開きます。
  2. データをプレビューする場合は、Generate Preview を選択します。
  3. データを確認します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

Workflowを使用してDelta Sharing Serverからインポート

Workflowのtd_load>:オペレータを使用して、Delta Sharing serverからデータをインポートできます。すでにSourceを作成している場合は実行できます。Sourceを作成したくない場合は、.ymlファイルを使用してインポートできます。

Sourceの実行

  1. Sourceを特定します。
  2. 一意のIDを取得するには、Sourceリストを開き、Delta Sharingでフィルタリングします。
  3. メニューを開き、Copy Unique IDを選択します。

4. td_load>:オペレータを使用してworkflowタスクを定義します。

+load:
   td_load>: unique_id_of_your_source
   database: ${td.dest_db}
   table: ${td.dest_table}
  1. Workflowを実行します。

パラメータリファレンス

NameDescriptionValueDefault ValueRequired
endpointDelta Sharing serverのエンドポイントtrue
tokenAPI連携を認証するためのBearerトークンtrue
share接続するShare名true
schema接続するSchema名true
table接続するTable名true
default_timezoneタイムスタンプ形式のデフォルトのタイムゾーンUTCtrue
retry_limit最大リトライ回数6false
retry_initial_wait_msecsリトライの初期待機時間(ミリ秒)30000false
max_retry_wait_msecs最大リトライ待機時間(ミリ秒)120000false
connection_timeout_msecsHTTP接続タイムアウト(ミリ秒)1800000false
write_timeout_msecsHTTP読み取り接続タイムアウト(ミリ秒)1800000false
read_timeout_msecsHTTP書き込み接続タイムアウト(ミリ秒)1800000false

Workflowコードのサンプル

Workflowコードのサンプルについては、Treasure Boxesをご覧ください。

CLI(Toolbelt)を使用してDelta Sharing serverからインポート

統合を設定する前に、最新のTD Toolbeltをインストールしてください。

シード設定ファイル (seed.yml) の作成

in:
  type: delta_sharing
  endpoint: http://endpoint.com
  token: ***
  schema: schema
  table: table
  share: share
  retry_limit: 7
  retry_initial_wait_msecs: 30000
  max_retry_wait_msecs: 120000
  connection_timeout_msecs: 1800000
  write_timeout_msecs: 1800000
  read_timeout_msecs: 1800000
out:
  mode: append

パラメータリファレンス

名前説明デフォルト値必須
endpointDelta Sharingエンドポイントtrue
tokenAPI連携の認証に使用するBearer tokentrue
share接続するshare名true
schema接続するschema名true
table接続するtable名true
default_timezoneタイムスタンプ形式のデフォルトタイムゾーン。短縮形とフルゾーンIDの両方をサポートUTCtrue
retry_limit最大リトライ回数6false
retry_initial_wait_msecsリトライの初期待機時間(ミリ秒)30000false
max_retry_wait_msecs最大リトライ待機時間(ミリ秒)120000false
connection_timeout_msecsHTTP接続タイムアウト(ミリ秒)1800000false
write_timeout_msecsHTTP読み取り接続タイムアウト(ミリ秒)1800000false
read_timeout_msecsHTTP書き込み接続タイムアウト(ミリ秒)1800000false

Delta Sharing連携は、指定されたprefixに一致するすべてのファイルをインポートします。

path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz

load.yml の生成

connector:guessを使用します。このコマンドは、ソースファイルを自動的に読み取り、ロジックを使用してファイル形式とそのフィールドおよびカラムを推測します。

$ td connector:guess seed.yml -o load.yml

load.ymlを開いて、ファイル形式、エンコーディング、カラム名、型などのファイル形式定義を確認できます。

in:
  type: delta_sharing
  endpoint: http://endpoint.com
  token: ***
  schema: schema
  table: table
  share: share
  retry_limit: 7
  retry_initial_wait_msecs: 30000
  max_retry_wait_msecs: 120000
  connection_timeout_msecs: 1800000
  write_timeout_msecs: 1800000
  read_timeout_msecs: 1800000
out:
  mode: append

データをプレビューするには、td connector:previewコマンドを使用します。

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id | company | customer | created_at |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. | David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |  Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

guessコマンドは、ソースデータファイルに3行以上、2列以上のデータが必要です。これは、コマンドがソースデータのサンプル行を使用して列定義にアクセスするためです。

システムが予期しない列名または列タイプを検出した場合は、load.ymlファイルを修正して再度プレビューしてください。

ロードジョブの実行

データのサイズによっては、ジョブの実行に数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを必ず指定してください。

Treasure Dataのストレージは時間によってパーティション分割されているため(data partitioningを参照)、--time-columnオプションの指定を推奨します。このオプションが指定されていない場合、データ統合は最初のlongまたはtimestamp列をパーティション分割時間として選択します。--time-columnで指定する列のタイプは、long型またはtimestamp型である必要があります。

データに時間列がない場合は、add_timeフィルタオプションを使用して時間列を追加できます。詳細については、add_time filter pluginを参照してください。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
--time-column created_at

connector:issueコマンドは、データベース(*td_sample_db)*とテーブル(td_sample_table)が既に作成されていることを前提としています。TDにデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成してください。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table
 --time-column created_at --auto-create-table

データ統合はサーバー側でレコードをソートしません。時間ベースのパーティション分割を効果的に使用するには、事前にファイル内のレコードをソートしてください。

timeという名前のフィールドがある場合は、--time-columnオプションを指定する必要はありません。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

インポートモード

load.ymlファイルのout:セクションでファイルインポートモードを指定できます。out:セクションは、Treasure Dataテーブルへのデータのインポート方法を制御します。たとえば、Treasure Dataの既存のテーブルにデータを追加するか、データを置き換えるかを選択できます。

モード説明
Appendレコードがターゲットテーブルに追加されます。in: ... out: mode: append
AlwaysReplaceターゲットテーブル内のデータを置き換えます。ターゲットテーブルに対して手動で行われたスキーマ変更はそのまま保持されます。in: ... out: mode: replace
Replace on new dataインポートする新しいデータがある場合にのみ、ターゲットテーブル内のデータを置き換えます。in: ... out: mode: replace_on_new_data

実行のスケジュール設定

定期的なデータ統合の実行をスケジュール設定できます。

Treasure Dataは、高可用性を確保するためにスケジューラーを慎重に設定しています。

TD Toolbeltを使用したスケジュールの作成

td connector:createコマンドを使用して新しいスケジュールを作成できます。

$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml

Treasure Dataのストレージは時間によってパーティション分割されているため(data partitioningも参照)、--time-columnオプションの指定を推奨します。

$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml \
 --time-column created_at

cronパラメータは、@hourly、@daily、@monthlyの3つの特殊オプションも受け入れます。

デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーン略語はサポートされておらず、予期しないスケジュールにつながる可能性があります。