Skip to content
Last updated

Amazon Redshift Import Integration Using The CLI

Treasure Data Connector CLIを使用して、Amazon Redshift clusterからTreasure Dataに直接datasetを取得します。このガイドでは、toolbeltのセットアップ、connector設定の作成、アドホックとスケジュールされたインポートの両方の実行を順を追って説明し、RedshiftデータをTDと同期させる方法を学びます。

Treasure Data Toolbeltのインストール

ターミナルを開き、次のコマンドを実行して最新のTD Toolbeltをインストールします。

$ td —version
0.14.1

Configuration File (config.yml)の作成

設定ファイルには、integrationからconnectorに入力されるものを指定するin:セクションと、connectorがTreasure Dataのdatabaseに出力するものを指定するout:セクションが含まれています。利用可能なoutモードの詳細については、Appendixを参照してください。

次の例に示すように、設定ファイル(例:config.yml)を準備します。Redshiftインスタンスのアクセス情報を提供します。

次の例に示すように、master userとmaster passwordを使用して設定ファイル(例:load.yml)を準備します。

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  select: "*"
out:
  mode: append

この例では、table内のすべてのレコードをダンプします。追加パラメータを使用して、より詳細な制御を行うことができます。

Redshiftインスタンスのサイズによっては、次のエラーが発生する場合があります。エラーを解決するには、load.ymlでfetch_rowsを設定できます。

Error: 422: BulkLoad job preview failed: org.postgresql.util.PSQLException:
ERROR: Fetch size 10000 exceeds the limit of 1000 for a single node configuration.
Reduce the client fetch/cache size or upgrade to a multi node installation.

インポートするデータのプレビュー(オプション)

td connector:previewコマンドを使用して、インポートするデータをプレビューできます。

$ td connector:preview load.yml
+---------+--------------+----------------------------------+------------+---------------------------+
| id:long | name:string  | description:string               | price:long | created_at:timestamp      |
+---------+--------------+----------------------------------+------------+---------------------------+
| 1       | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419       | "2014-02-16 13:01:06 UTC" |
| 2       | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298       | "2014-05-24 13:59:26 UTC" |
| 3       | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084       | "2014-06-21 00:18:21 UTC" |
| 4       | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669        | "2014-05-02 03:44:08 UTC" |
| 6       | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556       | "2014-03-29 08:30:23 UTC" |
+---------+--------------+----------------------------------+------------+---------------------------+

Load Jobの実行

td connector:issueを使用してjobを実行します。

Treasure Dataのストレージは時間でパーティション分割されているため、--time-columnオプションを指定することをお勧めします。オプションが指定されていない場合、data connectorは最初のlongまたはtimestampカラムをpartitioning timeとして選択します。--time-columnで指定されるカラムのタイプは、longまたはtimestampタイプのいずれかである必要があります(利用可能なカラム名とタイプを確認するには、Preview結果を使用します。一般的に、ほとんどのデータタイプにはlast_modified_dateカラムがあります)。

データにtimeカラムがない場合は、add_timeフィルターオプションを使用してカラムを追加できます。詳細については、add_time filterプラグインを参照してください。

load jobを送信します。データサイズによっては、数時間かかる場合があります。データが保存されるdatabaseとtableを指定する必要があります。

td connector:issueコマンドを使用してimport jobを送信します。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

前のコマンドは、*database(td_sample_db)table(td_sample_table)*がすでに作成されていることを前提としています。databaseまたはtableがTDに存在しない場合、このコマンドは成功しないため、databaseとtableを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してdatabaseとtableを自動作成します:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

timeというフィールドがある場合、--time-columnオプションを指定する必要はありません。

$ td connector:issue load.yml --database td_sample_db --table td_sample

incremental_columnsとlast_recordオプションを使用してtable内のカラムを指定することにより、レコードを段階的にロードできます。

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: devlp
  table: example
  incremental: true
  incremental_columns: [id, sub_id]
  last_record: [10000, 300]
out:
  mode: append
  exec: {}

connectorは、queryとソート値を内部的に自動的に再作成します。

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id, sub_id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000 OR (id = 10000 AND sub_id > 300)
ORDER BY id, sub_id

scheduled executionを使用している場合、connectorは自動的にlast_recordを生成し、内部的に保持します。次にスケジュールされた実行で使用できます。

incremental_columnsとして、strings、timestamp、integersのみがサポートされています。

incremental: trueを設定すると、queryオプションは使用できません。

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
  - 400

Scheduled Execution

定期的なintegrationインポートのために、定期的なdata connector実行をスケジュールできます。高可用性を確保するために、schedulerを慎重に設定しています。この機能を使用することで、ローカルデータセンターでcron daemonを必要としなくなります。

スケジュールされたインポートの場合、integrationのdata connectorは、指定されたターゲットに一致するすべてのオブジェクトをインポートします。

Scheduled executionは、integrationからデータを取得する際のdata connectorの動作を制御する追加の設定パラメータをサポートしています:

  • incremental この設定は、各オブジェクトに関連付けられたネイティブtimestampフィールドの1つに基づいて、data connectorがintegrationからデータを取得する方法を管理するload modeを制御するために使用されます。

    • incremental: true(デフォルト) このモードでは、data connectorは、connectorの前回の実行以降に更新された指定されたintegrationオブジェクトタイプのレコードのみを取得します。このモードは、ユーザーが前回のスケジュールされた実行以降に変更されたオブジェクトターゲットのみを取得したい場合に便利です。このモードは通常、'append'モードを使用してデータを宛先tableに書き込むことと組み合わせて使用されます。

      • incremental_columns(必須) このオプションは、integrationから必要なデータのみをロードするために、incrementalモードで必要です。
    • incremental: false このモードでは、data connectorは、最後に更新されたタイミングに関係なく、指定されたintegrationオブジェクトタイプのすべてのレコードを取得します。このモードは、'replace'モードを使用してデータを宛先tableに書き込むことと組み合わせるのが最適です。

  • columns この設定は、Treasure Dataにインポートするデータのカスタムschemaを定義するために使用されます。ここでは関心のあるカラムのみを定義できますが、取得しているオブジェクトに存在することを確認してください。それ以外の場合、これらのカラムは結果で使用できません。

  • last_record この設定は、前回のload jobからの最後のレコードを制御するために使用されます。オブジェクトには、カラム名のkeyとカラムの値の値が含まれている必要があります。keyは、integrationカラム名と一致する必要があります。

次は、出力に'append'モードと組み合わせたincrementalモードを使用したseedファイルの例です。

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  incremental: true
  incremental_columns: [id]
  last_record: [10000]
out:
  mode: append
  exec: {}

incremental\_columns:オプションを最適に使用するには、フルtableスキャンを回避するために関連するカラムにSORTKEYを設定します。この例では、次のindexを作成する必要があります:

CREATE TABLE dev (...) sortkey(id);

connectorは自動的にqueryとソート値を作成します。

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000
ORDER BY id

connectorは自動的にlast_recordを生成し、次のスケジュールされた実行で使用します。

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000

Scheduleの作成

td connector:createコマンドを使用して新しいscheduleを作成できます。scheduleの名前、cron形式のschedule、データが保存されるdatabaseとtable、およびdata connector設定ファイルが必要です。

cronパラメータは、@hourly@daily@monthlyのオプションを受け入れます。

デフォルトでは、scheduleはUTC timezoneで設定されます。-tまたは--timezoneオプションを使用して、timezoneでscheduleを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張timezone形式のみをサポートします。PST、CSTなどのtimezone略語は*サポートされておらず*、予期しないscheduleにつながる可能性があります。

Schedulesのリスト表示

td connector:listで現在スケジュールされているエントリのリストを確認できます。

$ td connector:list
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| Name                  | Cron        | Timezone | Delay | Database     | Table           | Config                     |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| daily_redshift_import | 10 0 * * *  | UTC      | 0     | td_sample_db | td_sample_table | {"type"=>"redshift", ... } |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+

Schedulesの設定と履歴の表示

td connector:showは、scheduleエントリの実行設定を表示します。

% td connector:show daily_redshift_import
Name     : daily_redshift_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table

td connector:historyは、scheduleエントリの実行履歴を表示します。個々の実行の結果を調査するには、td job jobidを使用します。

% td connector:history daily_redshift_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


% td job:show xxxxx
JobID       : 24903
Status      : success
Type        : bulkload
Database    : td_sample_db
Use '-v' option to show detailed messages.

Scheduleの削除

td connector:deleteはscheduleを削除します。

$ td connector:delete daily_redshift_import

Data ExtensionsのIncremental Loading

Treasure Dataは、dateフィールドを持つData Extensionsのincremental loadingをサポートしています。

incremental: trueが設定されている場合、data connectorは、指定されたdateフィールドのfrom_dateとfetch_daysによって指定された範囲に従ってレコードをロードします。

Appendix

Out Pluginのモード

Import Modes

load.ymlファイルのoutセクションでファイルimportモードを指定できます。

out:セクションは、データがTreasure Data tableにインポートされる方法を制御します。 例えば、Treasure Dataの既存のtableにデータを追加したり、データを置き換えたりすることができます。

ModeDescriptionExamples
Appendレコードはターゲットtableに追加されます。in: ... out: mode: append
Always Replaceターゲットtableのデータを置き換えます。ターゲットtableに加えられた手動のschema変更はそのまま残ります。in: ... out: mode: replace
Replace on new dataインポートする新しいデータがある場合にのみ、ターゲットtableのデータを置き換えます。in: ... out: mode: replace_on_new_data

次のリストは、使用可能なすべてのオプションの詳細を提供します:

Database name: データを転送するdatabaseの名前。(例:your_database_name) Use custom SELECT query?: シンプルなSELECT (columns) FROM table WHERE (condition)以上のものが必要な場合に使用します。 Schema: データを転送するschema。 SELECT columns: データを取得したい特定のカラムのみがある場合は、ここにリストします。それ以外の場合、すべてのカラムが転送されます。 Table: データをインポートするtable。 WHERE condition: tableから取得したデータに追加の特定性が必要な場合は、WHERE句の一部としてここで指定できます。 ORDER BY: 特定のフィールドでレコードを並べ替える必要がある場合に指定します。