Data Connector for Riak CS

The Data Connector for Riak CS enables import of the contents of .tsv and .csv files stored in your Riak CS bucket.

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data

Step 0: Install ‘td’ command v0.11.9 or later

Install the newest Treasure Data Toolbelt.

$ td --version
0.11.10

Step 1: Create Seed Config File (seed.yml)

First, prepare seed.yml as below, with your AWS access key and secret access key. You must also specify bucket name, and target file name (or prefix for multiple files).

in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file                # path the the *.csv or *.tsv file on your Riak CS bucket
  endpoint: host
out:
  mode: append

The Data Connector for Riak CS imports all files that match a specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)

For more details on available out modes, see Appendix

Step 2: Guess Fields (Generate load.yml)

Second, use connector:guess. This command automatically reads the target file, and intelligently guesses the file format.

$ td connector:guess seed.yml -o load.yml

If you open up load.yml, you’ll see the guessed file format definitions including file formats, encodings, column names, and types.

in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  endpoint: host
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - name: id
      type: long
    - name: company
      type: string
    - name: customer
      type: string
    - name: created_at
      type: timestamp
      format: '%Y-%m-%d %H:%M:%S'
out:
  mode: append

Then, you can preview how the system will parse the file by using the preview command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+
Untitled-3
guess command needs over 3 rows and 2 columns in source data file, because it guesses column definition using sample rows from source data.
Untitled-3
If the system detects your column name or column type unexpectedly, modify `load.yml` directly and preview again.
Untitled-3
Currently, the Data Connector supports parsing of "boolean", "long", "double", "string", and "timestamp" types.
Untitled-3
You will also need to have created the local database and table, prior to executing the load job. To do this:
$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table

Step 3: Execute Load Job

Finally, submit the load job. It may take a couple of hours depending on the size of the data. Users need to specify the database and table where their data is stored.

It’s also recommended to specify --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture) If the option is not given, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The above command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
Untitled-3
At present, the Data Connector does not sort records server-side. To use time-based partitioning effectively, please sort records in files beforehand. This restriction will be solved in the near future.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Mode (append/replace)

You can specify file import mode in out section of seed.yml.

append (default)

in:
  ...
out:
  mode: append

This is the default mode. The imported records are appended to the target table.

replace (In td 0.11.10 and later)

in:
  ...
out:
  mode: replace

If the target table already exists, the rows of the existing table are replaced with imported records.

Scheduled execution

You can schedule periodic Data Connector execution for incremental Riak CS file import. We take great care in distributing and operating our scheduler in order to achieve high availability. By using this feature, you no longer need a cron daemon on your local datacenter.

For the scheduled import, the Data Connector for Riak CS imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) at first and remembers the last path (path/to/sample_201505.csv.gz) for the next execution.

On the second and subsequent runs, it only imports files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample_201506.csv.gz, …)

Create the schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where their data will be stored, and the Data Connector configuration file.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture)

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at
Untitled-3
The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.
Untitled-3
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Please note that `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                        |
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"riak_cs", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+

Show the Setting and Schedule History

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  endpoint: host
  bucket: sample_bucket
  path_prefix: path/to/sample_
  parser:
    charset: UTF-8
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, please use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import

Appendix

A) Modes for out plugin

You can specify file import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Please note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

Last modified: Aug 26 2016 18:42:59 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.