# Amazon Redshift Import Integration Using The CLI Treasure Data Connector CLIを使用して、Amazon Redshift clusterからTreasure Dataに直接datasetを取得します。このガイドでは、toolbeltのセットアップ、connector設定の作成、アドホックとスケジュールされたインポートの両方の実行を順を追って説明し、RedshiftデータをTDと同期させる方法を学びます。 # Treasure Data Toolbeltのインストール ターミナルを開き、次のコマンドを実行して最新の[TD Toolbelt](https://toolbelt.treasuredata.com/)をインストールします。 ```bash $ td —version 0.14.1 ``` ## Configuration File (config.yml)の作成 設定ファイルには、integrationからconnectorに入力されるものを指定するin:セクションと、connectorがTreasure Dataのdatabaseに出力するものを指定するout:セクションが含まれています。利用可能なoutモードの詳細については、Appendixを参照してください。 次の例に示すように、設定ファイル(例:config.yml)を準備します。Redshiftインスタンスのアクセス情報を提供します。 次の例に示すように、master userとmaster passwordを使用して設定ファイル(例:load.yml)を準備します。 ``` in: type: redshift host: redshift_endpoint port: 5439 user: master_user password: master_password database: dev table: example select: "*" out: mode: append ``` この例では、table内のすべてのレコードをダンプします。[追加パラメータ](https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-redshift)を使用して、より詳細な制御を行うことができます。 Redshiftインスタンスのサイズによっては、次のエラーが発生する場合があります。エラーを解決するには、load.ymlでfetch_rowsを設定できます。 ``` Error: 422: BulkLoad job preview failed: org.postgresql.util.PSQLException: ERROR: Fetch size 10000 exceeds the limit of 1000 for a single node configuration. Reduce the client fetch/cache size or upgrade to a multi node installation. ``` ## インポートするデータのプレビュー(オプション) td connector:previewコマンドを使用して、インポートするデータをプレビューできます。 ``` $ td connector:preview load.yml +---------+--------------+----------------------------------+------------+---------------------------+ | id:long | name:string | description:string | price:long | created_at:timestamp | +---------+--------------+----------------------------------+------------+---------------------------+ | 1 | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419 | "2014-02-16 13:01:06 UTC" | | 2 | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298 | "2014-05-24 13:59:26 UTC" | | 3 | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084 | "2014-06-21 00:18:21 UTC" | | 4 | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669 | "2014-05-02 03:44:08 UTC" | | 6 | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556 | "2014-03-29 08:30:23 UTC" | +---------+--------------+----------------------------------+------------+---------------------------+ ``` ## Load Jobの実行 td connector:issueを使用してjobを実行します。 Treasure Dataのストレージは時間でパーティション分割されているため、--time-columnオプションを指定することをお勧めします。オプションが指定されていない場合、data connectorは最初のlongまたはtimestampカラムをpartitioning timeとして選択します。--time-columnで指定されるカラムのタイプは、longまたはtimestampタイプのいずれかである必要があります(利用可能なカラム名とタイプを確認するには、Preview結果を使用します。一般的に、ほとんどのデータタイプにはlast_modified_dateカラムがあります)。 データにtimeカラムがない場合は、add_timeフィルターオプションを使用してカラムを追加できます。詳細については、[add_time filter](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)プラグインを参照してください。 load jobを送信します。データサイズによっては、数時間かかる場合があります。データが保存されるdatabaseとtableを指定する必要があります。 td connector:issueコマンドを使用してimport jobを送信します。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at ``` 前のコマンドは、*database(td_sample_db)*と*table(td_sample_table)*がすでに作成されていることを前提としています。databaseまたはtableがTDに存在しない場合、このコマンドは成功しないため、databaseとtableを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してdatabaseとtableを自動作成します: ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` timeというフィールドがある場合、--time-columnオプションを指定する必要はありません。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample ``` incremental_columnsとlast_recordオプションを使用してtable内のカラムを指定することにより、レコードを段階的にロードできます。 ``` in: type: redshift host: redshift_endpoint port: 5439 user: master_user password: master_password database: devlp table: example incremental: true incremental_columns: [id, sub_id] last_record: [10000, 300] out: mode: append exec: {} ``` connectorは、queryとソート値を内部的に自動的に再作成します。 ``` # when last_record wasn't given SELECT * FROM( ...original query is here ) ORDER BY id, sub_id ::: terminal # when last_record was given SELECT * FROM( ...original query is here ) WHERE id > 10000 OR (id = 10000 AND sub_id > 300) ORDER BY id, sub_id ``` [scheduled execution](https://docs.treasuredata.com/smart/project-product-documentation/scheduling-jobs-using-td-console)を使用している場合、connectorは自動的にlast_recordを生成し、内部的に保持します。次にスケジュールされた実行で使用できます。 incremental_columnsとして、strings、timestamp、integersのみがサポートされています。 `incremental: true`を設定すると、`query`オプションは使用できません。 ``` in: type: redshift ... out: ... Config Diff --- in: last_record: - 20000 - 400 ``` ## Scheduled Execution 定期的なintegrationインポートのために、定期的なdata connector実行をスケジュールできます。高可用性を確保するために、schedulerを慎重に設定しています。この機能を使用することで、ローカルデータセンターでcron daemonを必要としなくなります。 スケジュールされたインポートの場合、integrationのdata connectorは、指定されたターゲットに一致するすべてのオブジェクトをインポートします。 Scheduled executionは、integrationからデータを取得する際のdata connectorの動作を制御する追加の設定パラメータをサポートしています: - incremental この設定は、各オブジェクトに関連付けられたネイティブtimestampフィールドの1つに基づいて、data connectorがintegrationからデータを取得する方法を管理するload modeを制御するために使用されます。 - incremental: true(デフォルト) このモードでは、data connectorは、connectorの前回の実行以降に更新された指定されたintegrationオブジェクトタイプのレコードのみを取得します。このモードは、ユーザーが前回のスケジュールされた実行以降に変更されたオブジェクトターゲットのみを取得したい場合に便利です。このモードは通常、'append'モードを使用してデータを宛先tableに書き込むことと組み合わせて使用されます。 - incremental_columns(必須) このオプションは、integrationから必要なデータのみをロードするために、incrementalモードで必要です。 - incremental: false このモードでは、data connectorは、最後に更新されたタイミングに関係なく、指定されたintegrationオブジェクトタイプのすべてのレコードを取得します。このモードは、'replace'モードを使用してデータを宛先tableに書き込むことと組み合わせるのが最適です。 - columns この設定は、Treasure Dataにインポートするデータのカスタムschemaを定義するために使用されます。ここでは関心のあるカラムのみを定義できますが、取得しているオブジェクトに存在することを確認してください。それ以外の場合、これらのカラムは結果で使用できません。 - last_record この設定は、前回のload jobからの最後のレコードを制御するために使用されます。オブジェクトには、カラム名のkeyとカラムの値の値が含まれている必要があります。keyは、integrationカラム名と一致する必要があります。 次は、出力に'append'モードと組み合わせたincrementalモードを使用したseedファイルの例です。 ``` in: type: redshift host: redshift_endpoint port: 5439 user: master_user password: master_password database: dev table: example incremental: true incremental_columns: [id] last_record: [10000] out: mode: append exec: {} ``` `incremental\_columns:`オプションを最適に使用するには、フルtableスキャンを回避するために関連するカラムにSORTKEYを設定します。この例では、次のindexを作成する必要があります: ``` CREATE TABLE dev (...) sortkey(id); ``` connectorは自動的にqueryとソート値を作成します。 ``` # when last_record wasn't given SELECT * FROM( ...original query is here ) ORDER BY id ::: terminal # when last_record was given SELECT * FROM( ...original query is here ) WHERE id > 10000 ORDER BY id ``` connectorは自動的にlast_recordを生成し、次のスケジュールされた実行で使用します。 ``` in: type: redshift ... out: ... Config Diff --- in: last_record: - 20000 ``` ### Scheduleの作成 td connector:createコマンドを使用して新しいscheduleを作成できます。scheduleの名前、cron形式のschedule、データが保存されるdatabaseとtable、およびdata connector設定ファイルが必要です。 `cron`パラメータは、`@hourly`、`@daily`、`@monthly`のオプションを受け入れます。 | | | --- | | デフォルトでは、scheduleはUTC timezoneで設定されます。-tまたは--timezoneオプションを使用して、timezoneでscheduleを設定できます。`--timezone`オプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張timezone形式のみをサポートします。PST、CSTなどのtimezone略語は*サポートされておらず*、予期しないscheduleにつながる可能性があります。 | ### Schedulesのリスト表示 td connector:listで現在スケジュールされているエントリのリストを確認できます。 ``` $ td connector:list +-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+ | Name | Cron | Timezone | Delay | Database | Table | Config | +-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+ | daily_redshift_import | 10 0 * * * | UTC | 0 | td_sample_db | td_sample_table | {"type"=>"redshift", ... } | +-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+ ``` ### Schedulesの設定と履歴の表示 td connector:showは、scheduleエントリの実行設定を表示します。 ``` % td connector:show daily_redshift_import Name : daily_redshift_import Cron : 10 0 * * * Timezone : UTC Delay : 0 Database : td_sample_db Table : td_sample_table ``` td connector:historyは、scheduleエントリの実行履歴を表示します。個々の実行の結果を調査するには、td job jobidを使用します。 ``` % td connector:history daily_redshift_import +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | JobID | Status | Records | Database | Table | Priority | Started | Duration | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ | 578066 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-18 00:10:05 +0000 | 160 | | 577968 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-17 00:10:07 +0000 | 161 | | 577914 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-16 00:10:03 +0000 | 152 | | 577872 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-15 00:10:04 +0000 | 163 | | 577810 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-14 00:10:04 +0000 | 164 | | 577766 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-13 00:10:04 +0000 | 155 | | 577710 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-12 00:10:05 +0000 | 156 | | 577610 | success | 10000 | td_sample_db | td_sample_table | 0 | 2015-04-11 00:10:04 +0000 | 157 | +--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+ 8 rows in set % td job:show xxxxx JobID : 24903 Status : success Type : bulkload Database : td_sample_db Use '-v' option to show detailed messages. ``` ### Scheduleの削除 td connector:deleteはscheduleを削除します。 ``` $ td connector:delete daily_redshift_import ``` ## Data ExtensionsのIncremental Loading Treasure Dataは、dateフィールドを持つ**Data Extensions**のincremental loadingをサポートしています。 incremental: trueが設定されている場合、data connectorは、指定されたdateフィールドのfrom_dateとfetch_daysによって指定された範囲に従ってレコードをロードします。 # Appendix ## Out Pluginのモード ### Import Modes load.ymlファイルのoutセクションでファイルimportモードを指定できます。 out:セクションは、データがTreasure Data tableにインポートされる方法を制御します。 例えば、Treasure Dataの既存のtableにデータを追加したり、データを置き換えたりすることができます。 | **Mode** | **Description** | **Examples** | | --- | --- | --- | | Append | レコードはターゲットtableに追加されます。 | `in: ... out: mode: append` | | Always Replace | ターゲットtableのデータを置き換えます。ターゲットtableに加えられた手動のschema変更はそのまま残ります。 | `in: ... out: mode: replace` | | Replace on new data | インポートする新しいデータがある場合にのみ、ターゲットtableのデータを置き換えます。 | `in: ... out: mode: replace_on_new_data` | 次のリストは、使用可能なすべてのオプションの詳細を提供します: **Database name:** データを転送するdatabaseの名前。(例:your_database_name) **Use custom SELECT query?:** シンプルなSELECT (columns) FROM table WHERE (condition)以上のものが必要な場合に使用します。 **Schema:** データを転送するschema。 **SELECT columns:** データを取得したい特定のカラムのみがある場合は、ここにリストします。それ以外の場合、すべてのカラムが転送されます。 **Table:** データをインポートするtable。 **WHERE condition:** tableから取得したデータに追加の特定性が必要な場合は、WHERE句の一部としてここで指定できます。 **ORDER BY:** 特定のフィールドでレコードを並べ替える必要がある場合に指定します。