# Delta Sharing Import Integration This feature is in BETA version. For more information, contact your Customer Success Representative. Delta sharingは、社内の顧客データを整理・統合し、マーケティング目的に最適なデータに変換することを支援します。このインテグレーションにより、Delta sharingデータをTreasure Dataにインポートできます。 ## 前提条件 - Treasure Dataの基本知識 - [Delta Sharing](https://delta.io/sharing/) serverの基本知識 ## 要件と制限事項 - Delta Sharing server上の情報にアクセスするための十分な権限を持つユーザーIDが必要です。 - Delta Sharing metastoresのAPIエンドポイントが必要です。 - Delta Sharing serverへのAPI呼び出しを認証するためのbearer tokenが必要です。 ## Treasure DataのStatic IPアドレス Treasure DataのStatic IPアドレスは、このインテグレーションのアクセスポイントおよび連携元となります。Static IPアドレスを確認するには、Customer Successの担当者またはテクニカルサポートにお問い合わせください。 ## DatabricksからEndpointと認証Tokenを取得 このインテグレーションは、recipientsを使用したDelta Sharingオープン共有を使用してDatabricksのデータを読み取ることができます。 主要なエンドポイントは以下のとおりです: - 共有のための新しいrecipientを作成する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing) - recipientにshareへのアクセス権を付与する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing) - エンドポイントとtokenを含むアクセス情報ファイルをダウンロードする — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing) - tokenの有効期限を延長する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing) ## TD ConsoleからDelta Sharing Serverをインポート ### 認証の作成 最初のステップは、認証情報のセットを使用して新しい認証を作成することです。 1. **Integrations Hub**を選択します。 2. **Catalog**を選択します。 ![](/assets/integrationshub-catalog2.e33c0a4c7d81c40cc83dd056c2143b97b1406220e213cab14ef349d69412ffef.7705d4c2.png) 3. CatalogでDelta Sharingを検索し、アイコンの上にマウスを置いて**Create Authentication**を選択します。 ![](/assets/deltasharing.98f59c95c397048249583f946418382764540b40fbccdd143b278988d420afc7.94e6bd79.png) 4. **Credentials**タブが選択されていることを確認し、インテグレーションの認証情報を入力します。 ![](/assets/newauthdeltasharing.f72c2d2e18026ab8d3b79405c32e0b912c29c96511009ec92d588b59f5512283.7705d4c2.png) #### **新規認証のフィールド** | パラメータ | 説明 | | --- | --- | | Endpoint | delta-sharing serverへのAPIエンドポイント | | Bearer Token | delta-sharing server APIにアクセスするためのtoken | 1. **Continue**を選択します。 2. 認証の名前を入力し、**Done**を選択します。 ### Sourceの作成 1. TD Consoleを開きます。 2. **Integrations Hub** > **Authentications**に移動します。 3. 新しく作成した認証を見つけて、**New Source**を選択します。 Create Sourceページが表示され、Source Tableタブが選択された状態になります。 #### 接続の作成 1. Data Transfer Nameフィールドにソース名を入力します。 2. データ転送に使用する認証の名前を入力します。 3. **Next**を選択します。 ![](/assets/deltasharing_connection.1532b8ae5d891d8187702b1f33c9e2872aa8b930bcb4009b576888d9e81c9c44.7705d4c2.png) Create Sourceページが表示され、**Source Table**タブが選択された状態になります。 #### Sourceテーブルの指定 1. パラメータを編集します ![](/assets/deltasharing_sourcetable.b8907baf314bcfb81f5c247c0d55474171df1d9f2da57de79b2b48b85aca8f7e.7705d4c2.png) | パラメータ | 必須 | 説明 | | --- | --- | --- | | Share | はい | Share名 | | Schema | はい | Schema名 | | Table | はい | Table名 | | Default Timezone | はい | タイムスタンプ形式のデフォルトのタイムゾーン | 1. **Next**を選択します。 **Data Settings**タブが選択された状態でCreate Sourceページが表示されます。 #### Data Settingsの指定 1. パラメータを編集します。 ![](/assets/deltasharing_datasettings.c7efd5bb9e32e027d1d2f15a6bac15c29d300893ff9fb45d1917306d6c6645f3.7705d4c2.png) | パラメータ | 必須 | 説明 | | --- | --- | --- | | Retry Limit | いいえ | リトライ回数 | | Initial Retry Interval In Millis | いいえ | 初期リトライ間隔(ミリ秒) | | Max Retry Wait In Millis | いいえ | 最大リトライ待機時間(ミリ秒) | | HTTP Connect Timeout In Millis | いいえ | HTTPタイムアウト(ミリ秒) | | HTTP Read Timeout In Millis | いいえ | HTTP読み取りタイムアウト(ミリ秒) | | HTTP Write Timeout In Millis | いいえ | HTTP書き込みタイムアウト(ミリ秒) | 1. **Next**を選択します。 **Data Preview**タブが選択された状態でCreate Sourceページが表示されます。 ### Data Preview インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。 1. **Next** を選択します。Data Preview ページが開きます。 2. データをプレビューする場合は、**Generate Preview** を選択します。 3. データを確認します。 ### Data Placement データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。 1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。 2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。 3. オプションで、database 名を入力します。 4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。 5. オプションで、table 名を入力します。 6. データをインポートする方法を選択します。 - **Append** (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。 - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。 - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。 7. **Timestamp-based Partition Key** 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。 8. データストレージの **Timezone** を選択します。 9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。 #### 一度だけ実行 1. **Off** を選択します。 2. **Scheduling Timezone** を選択します。 3. **Create & Run Now** を選択します。 #### 定期的に繰り返す 1. **On** を選択します。 2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。 3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。 4. **Scheduling Timezone** を選択します。 5. **Create & Run Now** を選択します。 転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。 ## Workflowを使用してDelta Sharing Serverからインポート Workflowのtd_load>:オペレータを使用して、Delta Sharing serverからデータをインポートできます。すでにSourceを作成している場合は実行できます。Sourceを作成したくない場合は、.ymlファイルを使用してインポートできます。 ### Sourceの実行 1. Sourceを特定します。 2. 一意のIDを取得するには、Sourceリストを開き、**Delta Sharing**でフィルタリングします。 3. メニューを開き、**Copy Unique ID**を選択します。 ![](/assets/image2021-10-12_12-26-58.09d9b84b0f1f752c7c95b0bc1c2d8e8b7302e5b91c6a3cb5f01309dadf53a604.5f7e1b55.png) 4. td_load>:オペレータを使用してworkflowタスクを定義します。 ```yaml +load: td_load>: unique_id_of_your_source database: ${td.dest_db} table: ${td.dest_table} ``` 1. Workflowを実行します。 **パラメータリファレンス** | Name | Description | Value | Default Value | Required | | --- | --- | --- | --- | --- | | endpoint | Delta Sharing serverのエンドポイント | | | true | | token | API連携を認証するためのBearerトークン | | | true | | share | 接続するShare名 | | | true | | schema | 接続するSchema名 | | | true | | table | 接続するTable名 | | | true | | default_timezone | タイムスタンプ形式のデフォルトのタイムゾーン | | UTC | true | | retry_limit | 最大リトライ回数 | | 6 | false | | retry_initial_wait_msecs | リトライの初期待機時間(ミリ秒) | | 30000 | false | | max_retry_wait_msecs | 最大リトライ待機時間(ミリ秒) | | 120000 | false | | connection_timeout_msecs | HTTP接続タイムアウト(ミリ秒) | | 1800000 | false | | write_timeout_msecs | HTTP読み取り接続タイムアウト(ミリ秒) | | 1800000 | false | | read_timeout_msecs | HTTP書き込み接続タイムアウト(ミリ秒) | | 1800000 | false | #### Workflowコードのサンプル Workflowコードのサンプルについては、[Treasure Boxes](https://github.com/treasure-data/treasure-boxes/tree/master/td_load/s3)をご覧ください。 ## CLI(Toolbelt)を使用してDelta Sharing serverからインポート 統合を設定する前に、最新の[TD Toolbelt](https://toolbelt.treasuredata.com/)をインストールしてください。 ### シード設定ファイル (seed.yml) の作成 ```yaml in: type: delta_sharing endpoint: http://endpoint.com token: *** schema: schema table: table share: share retry_limit: 7 retry_initial_wait_msecs: 30000 max_retry_wait_msecs: 120000 connection_timeout_msecs: 1800000 write_timeout_msecs: 1800000 read_timeout_msecs: 1800000 out: mode: append ``` **パラメータリファレンス** | 名前 | 説明 | 値 | デフォルト値 | 必須 | | --- | --- | --- | --- | --- | | endpoint | Delta Sharingエンドポイント | | | true | | token | API連携の認証に使用するBearer token | | | true | | share | 接続するshare名 | | | true | | schema | 接続するschema名 | | | true | | table | 接続するtable名 | | | true | | default_timezone | タイムスタンプ形式のデフォルトタイムゾーン。短縮形とフルゾーンIDの両方をサポート | | UTC | true | | retry_limit | 最大リトライ回数 | | 6 | false | | retry_initial_wait_msecs | リトライの初期待機時間(ミリ秒) | | 30000 | false | | max_retry_wait_msecs | 最大リトライ待機時間(ミリ秒) | | 120000 | false | | connection_timeout_msecs | HTTP接続タイムアウト(ミリ秒) | | 1800000 | false | | write_timeout_msecs | HTTP読み取り接続タイムアウト(ミリ秒) | | 1800000 | false | | read_timeout_msecs | HTTP書き込み接続タイムアウト(ミリ秒) | | 1800000 | false | Delta Sharing連携は、指定されたprefixに一致するすべてのファイルをインポートします。 **例** path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz ### load.yml の生成 connector:guessを使用します。このコマンドは、ソースファイルを自動的に読み取り、ロジックを使用してファイル形式とそのフィールドおよびカラムを推測します。 ``` $ td connector:guess seed.yml -o load.yml ``` load.ymlを開いて、ファイル形式、エンコーディング、カラム名、型などのファイル形式定義を確認できます。 **例** ```yaml in: type: delta_sharing endpoint: http://endpoint.com token: *** schema: schema table: table share: share retry_limit: 7 retry_initial_wait_msecs: 30000 max_retry_wait_msecs: 120000 connection_timeout_msecs: 1800000 write_timeout_msecs: 1800000 read_timeout_msecs: 1800000 out: mode: append ``` データをプレビューするには、td connector:previewコマンドを使用します。 ``` $ td connector:preview load.yml +-------+---------+----------+---------------------+ | id | company | customer | created_at | +-------+---------+----------+---------------------+ | 11200 | AA Inc. | David | 2015-03-31 06:12:37 | | 20313 | BB Imc. |Tom | 2015-04-01 01:00:07 | | 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 | | 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 | | 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 | +-------+---------+----------+---------------------+ ``` guessコマンドは、ソースデータファイルに3行以上、2列以上のデータが必要です。これは、コマンドがソースデータのサンプル行を使用して列定義にアクセスするためです。 システムが予期しない列名または列タイプを検出した場合は、load.ymlファイルを修正して再度プレビューしてください。 ### ロードジョブの実行 データのサイズによっては、ジョブの実行に数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを必ず指定してください。 Treasure Dataのストレージは時間によってパーティション分割されているため([data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)を参照)、*--time-column*オプションの指定を推奨します。このオプションが指定されていない場合、データ統合は最初の*long*または*timestamp*列をパーティション分割時間として選択します。*--time-column*で指定する列のタイプは、*long*型または*timestamp*型である必要があります。 データに時間列がない場合は、*add_time*フィルタオプションを使用して時間列を追加できます。詳細については、[add_time filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。 ```bash $ td connector:issue load.yml --database td_sample_db --table td_sample_table \ --time-column created_at ``` connector:issueコマンドは、データベース(*td_sample_db)*とテーブル(td_sample_table)が既に作成されていることを前提としています。TDにデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成してください。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` データ統合はサーバー側でレコードをソートしません。時間ベースのパーティション分割を効果的に使用するには、事前にファイル内のレコードをソートしてください。 timeという名前のフィールドがある場合は、--time-columnオプションを指定する必要はありません。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table ``` ### インポートモード load.ymlファイルのout:セクションでファイルインポートモードを指定できます。out:セクションは、Treasure Dataテーブルへのデータのインポート方法を制御します。たとえば、Treasure Dataの既存のテーブルにデータを追加するか、データを置き換えるかを選択できます。 | **モード** | **説明** | **例** | | --- | --- | --- | | Append | レコードがターゲットテーブルに追加されます。 | in: ... out: mode: append | | AlwaysReplace | ターゲットテーブル内のデータを置き換えます。ターゲットテーブルに対して手動で行われたスキーマ変更はそのまま保持されます。 | in: ... out: mode: replace | | Replace on new data | インポートする新しいデータがある場合にのみ、ターゲットテーブル内のデータを置き換えます。 | in: ... out: mode: replace_on_new_data | ### 実行のスケジュール設定 定期的なデータ統合の実行をスケジュール設定できます。 Treasure Dataは、高可用性を確保するためにスケジューラーを慎重に設定しています。 ### TD Toolbeltを使用したスケジュールの作成 td connector:createコマンドを使用して新しいスケジュールを作成できます。 ``` $ td connector:create daily_import "10 0 * * *" \ td_sample_db td_sample_table load.yml ``` Treasure Dataのストレージは時間によってパーティション分割されているため([data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)も参照)、--time-columnオプションの指定を推奨します。 ``` $ td connector:create daily_import "10 0 * * *" \ td_sample_db td_sample_table load.yml \ --time-column created_at ``` cronパラメータは、@hourly、@daily、@monthlyの3つの特殊オプションも受け入れます。 デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーン略語はサポートされておらず、予期しないスケジュールにつながる可能性があります。