# Riak CS Import Connector Riak CS用のData Connectorは、Riak CSバケットに保存されている*.tsvおよび*.csvファイルの内容のインポートを可能にします。 ## 前提条件 - Treasure Dataの基本知識 ## TD Consoleの使用 ### 接続を設定 Riak CSの接続を設定するには: 1. Treasure Data consoleで、Integration Hub > Catalog**に移動します。 2. Catalog画面の右端にある検索アイコンをクリックし、**Riak**と入力します。 3. RiakCS connectorの上にカーソルを置き、**Create Authentication**を選択します。 ![](/assets/riak.4dee70c291f2f35752204f7e3f348b1410f4ffd77a8f584c9b31363a9fc665db.ec09b8e3.png) 4. 以下のパラメータを設定します: - **Endpoint** - **Authentication Method** - **Access key ID** - **Secret access key** ![](/assets/riaknewauthentication.333a440dd8fb0540e5713d1225a9df7035090e0376b14bb69a043342a5752531.ec09b8e3.png) 1. 必要な接続の詳細を入力した後、**Continue**を選択します。 2. 接続に名前を付けて、後で接続の詳細を変更する必要がある場合に見つけられるようにします。 3. この接続を組織内の他のユーザーと共有したい場合は、**Share with others**チェックボックスをチェックします。このボックスがチェックされていない場合、この接続は自分だけに表示されます。 4. **Create Connection**を選択して接続を完了します。 作成した接続が、指定した名前で接続のリストに表示されます。 ### Data Preview You can see a [preview](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data) of your data before running the import by selecting Generate Preview. Data preview is optional and you can safely skip to the next page of the dialog if you choose to. 1. Select **Next**. The Data Preview page opens. 2. If you want to preview your data, select **Generate Preview**. 3. Verify the data. ### Data Placement For data placement, select the target database and table where you want your data placed and indicate how often the import should run. 1. Select **Next.** Under Storage, you will create a new or select an existing database and create a new or select an existing table for where you want to place the imported data. 2. Select a **Database** > **Select an existing** or **Create New Database**. 3. Optionally, type a database name. 4. Select a **Table**> **Select an existing** or **Create New Table**. 5. Optionally, type a table name. 6. Choose the method for importing the data. - **Append** (default)-Data import results are appended to the table. If the table does not exist, it will be created. - **Always Replace**-Replaces the entire content of an existing table with the result output of the query. If the table does not exist, a new table is created. - **Replace on New Data**-Only replace the entire content of an existing table with the result output when there is new data. 7. Select the **Timestamp-based Partition Key** column. If you want to set a different partition key seed than the default key, you can specify the long or timestamp column as the partitioning time. As a default time column, it uses upload_time with the add_time filter. 8. Select the **Timezone** for your data storage. 9. Under **Schedule**, you can choose when and how often you want to run this query. #### Run once 1. Select **Off**. 2. Select **Scheduling Timezone**. 3. Select **Create & Run Now**. #### Repeat Regularly 1. Select **On**. 2. Select the **Schedule**. The UI provides these four options: *@hourly*, *@daily* and *@monthly* or custom *cron*. 3. You can also select **Delay Transfer** and add a delay of execution time. 4. Select **Scheduling Timezone**. 5. Select **Create & Run Now**. After your transfer has run, you can see the results of your transfer in **Data Workbench** > **Databases.** ## TD Toolbeltの使用 ### 'td' Command v0.11.9以降をインストール 最新の[Treasure Data Toolbelt](https://toolbelt.treasuredata.com/)をインストールします。 ``` $ td --version 0.11.10 ``` ### Seed Config File(seed.yml)を作成 以下のようにseed.ymlを準備し、AWSアクセスキーとシークレットアクセスキーを設定します。バケット名とターゲットファイル名(または複数ファイルのプレフィックス)も指定する必要があります。 ```yaml in: type: riak_cs access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket path_prefix: path/to/sample_file # path the the *.csv or *.tsv file on your Riak CS bucket endpoint: host out: mode: append ``` Riak CS用のData Connectorは、指定されたプレフィックスと一致するすべてのファイルをインポートします。(例: path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) 利用可能なoutモードの詳細については、[Appendix](/ja/int/riak-cs-import-connector#h1__1835053169)を参照してください。 ### フィールドを推測(load.ymlを生成) connector:guessを使用します。このコマンドは自動的にターゲットファイルを読み取り、ファイル形式をインテリジェントに推測します。 ``` $ td connector:guess seed.yml -o load.yml ``` load.ymlを開くと、ファイル形式、エンコーディング、列名、および型を含む推測されたファイル形式定義が表示されます。 ```yaml in: type: riak_cs access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket path_prefix: path/to/sample_file endpoint: host parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '' skip_header_lines: 1 columns: - name: id type: long - name: company type: string - name: customer type: string - name: created_at type: timestamp format: '%Y-%m-%d %H:%M:%S' out: mode: append ``` 次に、previewコマンドを使用して、システムがファイルをどのように解析するかをプレビューできます。 ``` $ td connector:preview load.yml +-------+---------+----------+---------------------+ | id | company | customer | created_at | +-------+---------+----------+---------------------+ | 11200 | AA Inc. | David | 2015-03-31 06:12:37 | | 20313 | BB Imc. |Tom | 2015-04-01 01:00:07 | | 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 | | 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 | | 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 | +-------+---------+----------+---------------------+ ``` guessコマンドは、ソースデータファイルに3行以上、2列以上が必要です。ソースデータのサンプル行を使用して列定義を推測するためです。 | システムが列名または列タイプを予期せず検出した場合は、`load.yml`を直接変更して再度プレビューしてください。 Data Connectorは、"boolean"、"long"、"double"、"string"、および"timestamp"タイプの解析をサポートしています。 また、ロードジョブを実行する前に、ローカルデータベースとテーブルを作成しておく必要があります。 これを行うには、以下のコマンドを実行します。 ``` $ td database:create td_sample_db $ td table:create td_sample_db td_sample_table ``` ### ロードジョブを実行 ロードジョブを送信します。データのサイズによっては数時間かかる場合があります。ユーザーは、データが保存されているデータベースとテーブルを指定する必要があります。 Treasure Dataのストレージは時間でパーティション化されているため([data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)も参照)、--time-columnオプションを指定することをお勧めします。オプションが指定されていない場合、Data Connectorは最初のlongまたはtimestamp列をパーティション化時刻として選択します。--time-columnで指定された列のタイプは、longまたはtimestampタイプのいずれかである必要があります。 データに時刻列がない場合は、add_timeフィルターオプションを使用して追加できます。詳細については、[add_time filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。 ```bash $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at ``` 上記のコマンドは、すでに*database(td_sample_db)*と*table(td_sample_table)*を作成していることを前提としています。データベースまたはテーブルがTDに存在しない場合、このコマンドは成功しないため、データベースとテーブルを[手動で](https://docs.treasuredata.com/smart/project-product-documentation/data-management)作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成します: ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` 現在、Data Connectorはサーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。 timeというフィールドがある場合は、--time-columnオプションを指定する必要はありません。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table ``` ## Mode(append/replace) seed.ymlのoutセクションでファイルインポートモードを指定できます。 ### append(デフォルト) ``` in: ... out: mode: append ``` これはデフォルトモードです。インポートされたレコードはターゲットテーブルに追加されます。 ### replace(td 0.11.10以降) ``` in: ... out: mode: replace ``` ターゲットテーブルがすでに存在する場合、既存のテーブルの行はインポートされたレコードで置き換えられます。 ### スケジュール実行 インクリメンタルRiak CSファイルインポートのために、定期的なData Connector実行をスケジュールできます。高可用性を確保するため、スケジューラーは慎重に管理されています。この機能を使用することで、ローカルデータセンターでcronデーモンを実行する必要がなくなります。 スケジュールされたインポートの場合、Riak CS用のData Connectorは、最初に指定されたプレフィックスと一致するすべてのファイルをインポート(例: path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)し、次回の実行のために最後のパス(path/to/sample_201505.csv.gz)を記憶します。 2回目以降の実行では、アルファベット順(辞書順)で最後のパスの後に来るファイルのみをインポートします。(path/to/sample_201506.csv.gz, …) ### スケジュールを作成 新しいスケジュールは、td connector:createコマンドを使用して作成できます。以下が必要です: スケジュールの名前、cronスタイルのスケジュール、データが保存されるデータベースとテーブル、およびデータコネクタ設定ファイル。 ``` $ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml ``` TDストレージは時間でパーティション化されているため([data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)も参照)、--time-columnオプションを指定することをお勧めします。 ``` $ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml \ --time-column created_at ``` `cron`パラメータは、3つの特別なオプションも受け入れます: `@hourly`、`@daily`、`@monthly`。 | デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。`--timezone`オプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートします。PST、CSTなどのタイムゾーンの略語は*サポートされておらず*、予期しないスケジュールにつながる可能性があります。 ### スケジュールをリスト コマンドtd connector:listを実行することで、現在スケジュールされているエントリのリストを確認できます。 ``` $ td connector:list ``` ## 設定とスケジュール履歴を表示 td connector:showは、スケジュールエントリの実行設定を表示します。 ``` td connector:show daily_import ``` td connector:historyは、スケジュールエントリの実行履歴を表示します。個々の実行の結果を調査するには、td job jobidを使用します。 ``` td connector:history daily_import ``` ## スケジュールを削除 td connector:deleteは、スケジュールを削除します。 ``` $ td connector:delete daily_import ``` # 付録 ## Out Pluginのモード seed.ymlのoutセクションでファイルインポートモードを指定できます。 ### append(デフォルト) これはデフォルトモードで、レコードはターゲットテーブルに追加されます。 ``` in: ... out: mode: append ``` ### replace(td 0.11.10以降) このモードは、ターゲットテーブルのデータを置き換えます。ターゲットテーブルに対して行われた手動のスキーマ変更は、このモードでもそのまま残ります。 ``` in: ... out: mode: replace ```