Data Connector for Amazon Redshift

Table of Contents

Prerequisites

  • Redshift instance created
  • Treasure Data Toolbelt installed
  • Basic knowledge of Amazon Redshift
  • Basic knowledge of Treasure Data

Step 0: Install ‘td’ command v0.11.9 or later

Install the latest Treasure Data Toolbelt.

$ td —version 0.14.1

Step 1: Create Configuration File

Prepare configuration file (for eg: load.yml) like below, with your master user and master password.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  select: "*"
out:
  mode: append

This example dumps all records inside the table. You can have more detailed control with additional parameters.

For more details on available out modes, see Appendix.

Untitled-3
Depending on your Redshift instance size, you may hit the following the error. To resolve the error, you could configure fetch_rows in load.yml.
Error: 422: BulkLoad job preview failed: org.postgresql.util.PSQLException:
ERROR: Fetch size 10000 exceeds the limit of 1000 for a single node configuration.
Reduce the client fetch/cache size or upgrade to a multi node installation.

Step 2(Optional): Preview Data to be imported

You can preview data to be imported using the command td connector:preview.

$ td connector:preview load.yml
+---------+--------------+----------------------------------+------------+---------------------------+
| id:long | name:string  | description:string               | price:long | created_at:timestamp      |
+---------+--------------+----------------------------------+------------+---------------------------+
| 1       | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419       | "2014-02-16 13:01:06 UTC" |
| 2       | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298       | "2014-05-24 13:59:26 UTC" |
| 3       | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084       | "2014-06-21 00:18:21 UTC" |
| 4       | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669        | "2014-05-02 03:44:08 UTC" |
| 6       | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556       | "2014-03-29 08:30:23 UTC" |
+---------+--------------+----------------------------------+------------+---------------------------+

Step 3: Execute Load Job

Finally, submit the load job.

It is recommended to specify --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture) If the option is not given, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin

Use td connector:issue command to submit the import job.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The above command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

If you have a field called time, you do not have to specify --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample
Untitled-3
It may take few mins to hours for import job to complete, depending on size of your data.

You can load records incrementally by specifing columns in your table by utilizing the incremental_columns and last_record options.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  incremental: true
  incremental_columns: [id, sub_id]
  last_record: [10000, 300]
out:
  mode: append
  exec: {}

Then plugin will automatically recreate query and sort value at internal.

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id, sub_id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000 OR (id = 10000 AND sub_id > 300)
ORDER BY id, sub_id

If you’re using with scheduled execution, plugin will automatically generate last_record and hold it internally. Then you can use it at next scheduled execution.

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
  - 400
Untitled-3
`query` option can't available when you set `incremental: true`.
Untitled-3
Currently, only strings and integers are supported as incremental_columns.

Scheduled Execution

You can schedule Data Connector jobs to periodically import data from Redshift. We take great care in distributing and operating our scheduler in order to achieve high availability. By using this feature, you no longer need a cron daemon on your local data center.

Incremental load

You can load records incrementally by specifying columns from your table to the incremental_columns parameter. Optionally, you may specify some initial values to the last_record parameter.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  incremental: true
  incremental_columns: [id]
  last_record: [10000]
out:
  mode: append
  exec: {}
Untitled-3
To optimally use `incremental_columns:` option, please set a SORTKEY on the relevant columns to avoid full table scans. For this example, following index should be created:
CREATE TABLE dev (...) sortkey(id);

The connector will automatically create the query and sort value.

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000
ORDER BY id

The connector will automatically generate last_record and use it at next scheduled execution.

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
Untitled-3
`query` option can't available when you set `incremental: true`.
Untitled-3
Currently, only strings, integers are supported as incremental_columns.

Create the Schedule

A new schedule can be created by using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where the data will be stored, and the Data Connector configuration file.

  $ td connector:create \
  daily_redshift_import \
  "10 0 * * *" \
  td_sample_db \
  td_sample_table \
  load.yml
Untitled-3
The `cron` parameter also accepts these three options: `@hourly`, `@daily` and `@monthly`.
Untitled-3
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Please note that `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by td connector:list.

$ td connector:list
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| Name                  | Cron        | Timezone | Delay | Database     | Table           | Config                     |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| daily_redshift_import | 10 0 * * *  | UTC      | 0     | td_sample_db | td_sample_table | {"type"=>"redshift", ... } |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+

Show the Setting and History of Schedules

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_redshift_import
Name     : daily_redshift_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual execution, please use td job <jobid>.

% td connector:history daily_redshift_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


% td job:show xxxxx
JobID       : 24903
Status      : success
Type        : bulkload
Database    : td_sample_db
Use '-v' option to show detailed messages.

Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_redshift_import

Appendix

A) Modes for out plugin

You can specify file import mode in out section of load.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Please note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

Last modified: Feb 24 2017 09:27:52 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.