The Data Connector for Riak CS enables the import of the contents of .tsv and .csv files stored in your Riak CS bucket.


Prerequisites

  • Basic knowledge of Treasure Data


Configure the Connection

To configure a connection for Riak CS:

  1. In the Treasure Data console, navigate to Integration Hub > Catalog.
  2. Click the search icon on the far-right of the Catalog screen, and enter Riak.
  3. Hover over the RiakCS connector and select Create Authentication.
  4. Set the following parameters:
    Endpoint
    Authentication Method
    Access key ID
    Secret access key
  5. Select Continue after entering the required connection details.
  6. Name the connection so you can find it later should you need to modify any of the connection details.
  7. If you would like to share this connection with other users in your organization, check the Share with others checkbox. If this box is unchecked this connection is visible only to you.
  8. Select Create Connection to complete the connection.

The connection you just created appears in your list of connections with the name you provided.

Using the TD Toolbelt

Install ‘td’ Command v0.11.9 or Later

Install the newest Treasure Data Toolbelt.

$ td --version
0.11.10


Create Seed Config File (seed.yml)

Prepare seed.yml as below, with your AWS access key and secret access key. You must also specify bucket name, and target file name (or prefix for multiple files).

in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file                # path the the *.csv or *.tsv file on your Riak CS bucket
  endpoint: host
out:
  mode: append

The Data Connector for Riak CS imports all files that match a specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)

For more details on available out modes, see Appendix.

Guess Fields (Generate load.yml)

Use connector:guess. This command automatically reads the target file, and intelligently guesses the file format.

$ td connector:guess seed.yml -o load.yml

If you open up load.yml, you’ll see the guessed file format definitions including file formats, encodings, column names, and types.

in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  endpoint: host
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - name: id
      type: long
    - name: company
      type: string
    - name: customer
      type: string
    - name: created_at
      type: timestamp
      format: '%Y-%m-%d %H:%M:%S'
out:
  mode: append

Then, you can preview how the system parses the file by using the preview command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

The guess command needs over 3 rows and 2 columns in source data file, because it guesses column definition using sample rows from source data.

If the system detects your column name or column type unexpectedly, modify `load.yml` directly and preview again.

The Data Connector supports parsing of "boolean", "long", "double", "string", and "timestamp" types.

You also must have created the local database and table, prior to executing the load job. To do this:

$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table


Execute Load Job

Submit the load job. It may take a couple of hours depending on the size of the data. Users need to specify the database and table where their data is stored.

It’s also recommended to specify --time-column option, because Treasure Data’s storage is partitioned by time (see also data partitioning) If the option is not given, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The above command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

At present, the Data Connector does not sort records server-side. To use time-based partitioning effectively, sort records in files beforehand.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Mode (append/replace)

You can specify file import mode in out section of seed.yml.

append (default)

in:
  ...
out:
  mode: append

This is the default mode. The imported records are appended to the target table.

replace (In td 0.11.10 and later)

in:
  ...
out:
  mode: replace

If the target table already exists, the rows of the existing table are replaced with imported records.

Scheduled Execution

You can schedule periodic Data Connector execution for incremental Riak CS file import. We manage our scheduler carefully to ensure high availability. By using this feature, you no longer need a cron daemon on your local data center.

For the scheduled import, the Data Connector for Riak CS imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) at first and remembers the last path (path/to/sample_201505.csv.gz) for the next execution.

On the second and subsequent runs, it only imports files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample_201506.csv.gz, …)


Create the Schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where their data will be stored, and the data connector configuration file.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option, since TD Storage is partitioned by time (see also data partitioning).

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at

The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.

By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.


List the Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                        |
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"riak_cs", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+-----------------------------------------------+


Show the Setting and Schedule History

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: riak_cs
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  endpoint: host
  bucket: sample_bucket
  path_prefix: path/to/sample_
  parser:
    charset: UTF-8
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import


Appendix

Modes for Out Plugin

You can specify file import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace



  • No labels