Treasure Data Connector CLIを使用して、Amazon Redshift clusterからTreasure Dataに直接datasetを取得します。このガイドでは、toolbeltのセットアップ、connector設定の作成、アドホックとスケジュールされたインポートの両方の実行を順を追って説明し、RedshiftデータをTDと同期させる方法を学びます。
ターミナルを開き、次のコマンドを実行して最新のTD Toolbeltをインストールします。
$ td —version
0.14.1設定ファイルには、integrationからconnectorに入力されるものを指定するin:セクションと、connectorがTreasure Dataのdatabaseに出力するものを指定するout:セクションが含まれています。利用可能なoutモードの詳細については、Appendixを参照してください。
次の例に示すように、設定ファイル(例:config.yml)を準備します。Redshiftインスタンスのアクセス情報を提供します。
次の例に示すように、master userとmaster passwordを使用して設定ファイル(例:load.yml)を準備します。
in:
type: redshift
host: redshift_endpoint
port: 5439
user: master_user
password: master_password
database: dev
table: example
select: "*"
out:
mode: appendこの例では、table内のすべてのレコードをダンプします。追加パラメータを使用して、より詳細な制御を行うことができます。
Redshiftインスタンスのサイズによっては、次のエラーが発生する場合があります。エラーを解決するには、load.ymlでfetch_rowsを設定できます。
Error: 422: BulkLoad job preview failed: org.postgresql.util.PSQLException:
ERROR: Fetch size 10000 exceeds the limit of 1000 for a single node configuration.
Reduce the client fetch/cache size or upgrade to a multi node installation.td connector:previewコマンドを使用して、インポートするデータをプレビューできます。
$ td connector:preview load.yml
+---------+--------------+----------------------------------+------------+---------------------------+
| id:long | name:string | description:string | price:long | created_at:timestamp |
+---------+--------------+----------------------------------+------------+---------------------------+
| 1 | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419 | "2014-02-16 13:01:06 UTC" |
| 2 | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298 | "2014-05-24 13:59:26 UTC" |
| 3 | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084 | "2014-06-21 00:18:21 UTC" |
| 4 | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669 | "2014-05-02 03:44:08 UTC" |
| 6 | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556 | "2014-03-29 08:30:23 UTC" |
+---------+--------------+----------------------------------+------------+---------------------------+td connector:issueを使用してjobを実行します。
Treasure Dataのストレージは時間でパーティション分割されているため、--time-columnオプションを指定することをお勧めします。オプションが指定されていない場合、data connectorは最初のlongまたはtimestampカラムをpartitioning timeとして選択します。--time-columnで指定されるカラムのタイプは、longまたはtimestampタイプのいずれかである必要があります(利用可能なカラム名とタイプを確認するには、Preview結果を使用します。一般的に、ほとんどのデータタイプにはlast_modified_dateカラムがあります)。
データにtimeカラムがない場合は、add_timeフィルターオプションを使用してカラムを追加できます。詳細については、add_time filterプラグインを参照してください。
load jobを送信します。データサイズによっては、数時間かかる場合があります。データが保存されるdatabaseとtableを指定する必要があります。
td connector:issueコマンドを使用してimport jobを送信します。
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at前のコマンドは、*database(td_sample_db)とtable(td_sample_table)*がすでに作成されていることを前提としています。databaseまたはtableがTDに存在しない場合、このコマンドは成功しないため、databaseとtableを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してdatabaseとtableを自動作成します:
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-tabletimeというフィールドがある場合、--time-columnオプションを指定する必要はありません。
$ td connector:issue load.yml --database td_sample_db --table td_sampleincremental_columnsとlast_recordオプションを使用してtable内のカラムを指定することにより、レコードを段階的にロードできます。
in:
type: redshift
host: redshift_endpoint
port: 5439
user: master_user
password: master_password
database: devlp
table: example
incremental: true
incremental_columns: [id, sub_id]
last_record: [10000, 300]
out:
mode: append
exec: {}connectorは、queryとソート値を内部的に自動的に再作成します。
# when last_record wasn't given
SELECT * FROM(
...original query is here
)
ORDER BY id, sub_id
::: terminal
# when last_record was given
SELECT * FROM(
...original query is here
)
WHERE id > 10000 OR (id = 10000 AND sub_id > 300)
ORDER BY id, sub_idscheduled executionを使用している場合、connectorは自動的にlast_recordを生成し、内部的に保持します。次にスケジュールされた実行で使用できます。
incremental_columnsとして、strings、timestamp、integersのみがサポートされています。
incremental: trueを設定すると、queryオプションは使用できません。
in:
type: redshift
...
out:
...
Config Diff
---
in:
last_record:
- 20000
- 400定期的なintegrationインポートのために、定期的なdata connector実行をスケジュールできます。高可用性を確保するために、schedulerを慎重に設定しています。この機能を使用することで、ローカルデータセンターでcron daemonを必要としなくなります。
スケジュールされたインポートの場合、integrationのdata connectorは、指定されたターゲットに一致するすべてのオブジェクトをインポートします。
Scheduled executionは、integrationからデータを取得する際のdata connectorの動作を制御する追加の設定パラメータをサポートしています:
incremental この設定は、各オブジェクトに関連付けられたネイティブtimestampフィールドの1つに基づいて、data connectorがintegrationからデータを取得する方法を管理するload modeを制御するために使用されます。
incremental: true(デフォルト) このモードでは、data connectorは、connectorの前回の実行以降に更新された指定されたintegrationオブジェクトタイプのレコードのみを取得します。このモードは、ユーザーが前回のスケジュールされた実行以降に変更されたオブジェクトターゲットのみを取得したい場合に便利です。このモードは通常、'append'モードを使用してデータを宛先tableに書き込むことと組み合わせて使用されます。
- incremental_columns(必須) このオプションは、integrationから必要なデータのみをロードするために、incrementalモードで必要です。
incremental: false このモードでは、data connectorは、最後に更新されたタイミングに関係なく、指定されたintegrationオブジェクトタイプのすべてのレコードを取得します。このモードは、'replace'モードを使用してデータを宛先tableに書き込むことと組み合わせるのが最適です。
columns この設定は、Treasure Dataにインポートするデータのカスタムschemaを定義するために使用されます。ここでは関心のあるカラムのみを定義できますが、取得しているオブジェクトに存在することを確認してください。それ以外の場合、これらのカラムは結果で使用できません。
last_record この設定は、前回のload jobからの最後のレコードを制御するために使用されます。オブジェクトには、カラム名のkeyとカラムの値の値が含まれている必要があります。keyは、integrationカラム名と一致する必要があります。
次は、出力に'append'モードと組み合わせたincrementalモードを使用したseedファイルの例です。
in:
type: redshift
host: redshift_endpoint
port: 5439
user: master_user
password: master_password
database: dev
table: example
incremental: true
incremental_columns: [id]
last_record: [10000]
out:
mode: append
exec: {}incremental\_columns:オプションを最適に使用するには、フルtableスキャンを回避するために関連するカラムにSORTKEYを設定します。この例では、次のindexを作成する必要があります:
CREATE TABLE dev (...) sortkey(id);connectorは自動的にqueryとソート値を作成します。
# when last_record wasn't given
SELECT * FROM(
...original query is here
)
ORDER BY id
::: terminal
# when last_record was given
SELECT * FROM(
...original query is here
)
WHERE id > 10000
ORDER BY idconnectorは自動的にlast_recordを生成し、次のスケジュールされた実行で使用します。
in:
type: redshift
...
out:
...
Config Diff
---
in:
last_record:
- 20000td connector:createコマンドを使用して新しいscheduleを作成できます。scheduleの名前、cron形式のschedule、データが保存されるdatabaseとtable、およびdata connector設定ファイルが必要です。
cronパラメータは、@hourly、@daily、@monthlyのオプションを受け入れます。
デフォルトでは、scheduleはUTC timezoneで設定されます。-tまたは--timezoneオプションを使用して、timezoneでscheduleを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張timezone形式のみをサポートします。PST、CSTなどのtimezone略語は*サポートされておらず*、予期しないscheduleにつながる可能性があります。 |
td connector:listで現在スケジュールされているエントリのリストを確認できます。
$ td connector:list
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| Name | Cron | Timezone | Delay | Database | Table | Config |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| daily_redshift_import | 10 0 * * * | UTC | 0 | td_sample_db | td_sample_table | {"type"=>"redshift", ... } |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+td connector:showは、scheduleエントリの実行設定を表示します。
% td connector:show daily_redshift_import
Name : daily_redshift_import
Cron : 10 0 * * *
Timezone : UTC
Delay : 0
Database : td_sample_db
Table : td_sample_tabletd connector:historyは、scheduleエントリの実行履歴を表示します。個々の実行の結果を調査するには、td job jobidを使用します。
% td connector:history daily_redshift_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID | Status | Records | Database | Table | Priority | Started | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-18 00:10:05 +0000 | 160 |
| 577968 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-17 00:10:07 +0000 | 161 |
| 577914 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-16 00:10:03 +0000 | 152 |
| 577872 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-15 00:10:04 +0000 | 163 |
| 577810 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-14 00:10:04 +0000 | 164 |
| 577766 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-13 00:10:04 +0000 | 155 |
| 577710 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-12 00:10:05 +0000 | 156 |
| 577610 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-11 00:10:04 +0000 | 157 |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set
% td job:show xxxxx
JobID : 24903
Status : success
Type : bulkload
Database : td_sample_db
Use '-v' option to show detailed messages.td connector:deleteはscheduleを削除します。
$ td connector:delete daily_redshift_importTreasure Dataは、dateフィールドを持つData Extensionsのincremental loadingをサポートしています。
incremental: trueが設定されている場合、data connectorは、指定されたdateフィールドのfrom_dateとfetch_daysによって指定された範囲に従ってレコードをロードします。
load.ymlファイルのoutセクションでファイルimportモードを指定できます。
out:セクションは、データがTreasure Data tableにインポートされる方法を制御します。 例えば、Treasure Dataの既存のtableにデータを追加したり、データを置き換えたりすることができます。
| Mode | Description | Examples |
|---|---|---|
| Append | レコードはターゲットtableに追加されます。 | in: ... out: mode: append |
| Always Replace | ターゲットtableのデータを置き換えます。ターゲットtableに加えられた手動のschema変更はそのまま残ります。 | in: ... out: mode: replace |
| Replace on new data | インポートする新しいデータがある場合にのみ、ターゲットtableのデータを置き換えます。 | in: ... out: mode: replace_on_new_data |
次のリストは、使用可能なすべてのオプションの詳細を提供します:
Database name: データを転送するdatabaseの名前。(例:your_database_name) Use custom SELECT query?: シンプルなSELECT (columns) FROM table WHERE (condition)以上のものが必要な場合に使用します。 Schema: データを転送するschema。 SELECT columns: データを取得したい特定のカラムのみがある場合は、ここにリストします。それ以外の場合、すべてのカラムが転送されます。 Table: データをインポートするtable。 WHERE condition: tableから取得したデータに追加の特定性が必要な場合は、WHERE句の一部としてここで指定できます。 ORDER BY: 特定のフィールドでレコードを並べ替える必要がある場合に指定します。