Skip to content
Last updated

Amazon Redshift Import Integration Using The CLI

Use the Treasure Data Connector CLI to pull datasets from your Amazon Redshift cluster directly into Treasure Data. This guide walks you through setting up the toolbelt, creating the connector configuration, and running both ad-hoc and scheduled imports so you can keep your Redshift data in sync with TD.

Install the Treasure Data Toolbelt

Open a terminal and run the following command to install the newest TD Toolbelt.

$ td —version
0.14.1 

Create a Configuration File (config.yml)

The configuration file includes an in: section where you specify what comes into the connector from your integration and an out: section where you specify what the connector puts out to the database in Treasure Data. For more details on available out modes, see the Appendix.

Prepare configuration file (for example, config.yml) as shown in the following example. Provide your Redshift instance access information.

Prepare configuration file (for example, load.yml) as shown in the following example, with your master user and master password.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  select: "*"
out:
  mode: append

This example dumps all records inside the table. You can have more detailed control with additional parameters.

Depending on your Redshift instance size, you may encounter the following the error. To resolve the error, you could configure fetch_rows in load.yml.

Error: 422: BulkLoad job preview failed: org.postgresql.util.PSQLException:
ERROR: Fetch size 10000 exceeds the limit of 1000 for a single node configuration.
Reduce the client fetch/cache size or upgrade to a multi node installation.

Preview the Data to be Imported (Optional)

You can preview data to be imported using the command td connector:preview.

$ td connector:preview load.yml
+---------+--------------+----------------------------------+------------+---------------------------+
| id:long | name:string  | description:string               | price:long | created_at:timestamp      |
+---------+--------------+----------------------------------+------------+---------------------------+
| 1       | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419       | "2014-02-16 13:01:06 UTC" |
| 2       | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298       | "2014-05-24 13:59:26 UTC" |
| 3       | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084       | "2014-06-21 00:18:21 UTC" |
| 4       | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669        | "2014-05-02 03:44:08 UTC" |
| 6       | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556       | "2014-03-29 08:30:23 UTC" |
+---------+--------------+----------------------------------+------------+---------------------------+

Execute the Load Job

You use td connector:issue to execute the job.

It is recommended to specify --time-column option, because Treasure Data’s storage is partitioned by time. If the option is not given, the data connector selects the first long or timestamp column as the partitioning time. The type of the column, specified by --time-column, must be either of long or timestamp type (use Preview results to check for the available column name and type. Generally, most data types have a last_modified_date column).

If your data doesn’t have a time column, you can add the column by using the add_time filter option. See details at add_time filter plugin.

Submit the load job. It may take a couple of hours depending on the data size. You must specify the database and table where the data is stored.

Use td connector:issue command to submit the import job.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The preceding command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD, this command will not succeed so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

If you have a field called time, you do not have to specify --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample

You can load records incrementally by specifying columns in your table utilizing the incremental_columns and last_record options.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: devlp
  table: example
  incremental: true
  incremental_columns: [id, sub_id]
  last_record: [10000, 300]
out:
  mode: append
  exec: {}

The connector automatically recreates the query and sort values internally.

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id, sub_id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000 OR (id = 10000 AND sub_id > 300)
ORDER BY id, sub_id

If you’re using scheduled execution, the connector automatically generates last_record and holds it internally. Then you can use it at the next scheduled execution.

Only strings, timestamp and integers are supported as incremental_columns.

query option isn't available when you set incremental: true.

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
  - 400

Scheduled Execution

You can schedule a periodic data connector execution for periodic integration import. We configure our scheduler carefully to ensure high availability. By using this feature, you no longer need a cron daemon on your local data center.

For the scheduled import, the data connector for integration imports all objects that match the specified target.

Scheduled execution supports additional configuration parameters that control the behavior of the data connector during its periodic attempts to fetch data from integration:

  • incremental This configuration is used to control the load mode, which governs how the data connector fetches data fromintegration based on one of the native timestamp fields associated with each object.

    • incremental: true (default)
      In this mode, the data connector fetches only records of the specified integration object type that have been updated since the previous run of the connector. This mode is useful when the user wants to fetch just the object targets that have changed since the previous scheduled run. This mode is typically combined with writing data into the destination table using ‘append’ mode.

      • incremental_columns (required)
        This option is required for incremental mode, to load necessary data only from integration.
    • incremental: false
      In this mode, the data connector fetches all the records of the specified integration object type, regardless of when they were last updated. This mode is best combined with writing data into a destination table using ‘replace’ mode.

  • columns This configuration is used to define a custom schema for data to be imported into Treasure Data. You can define only columns that you are interested in here but make sure they exist in the object that you are fetching. Otherwise, these columns aren’t available in the result.

  • last_record This configuration is used to control the last record from the previous load job. It requires the object include a key for the column name and a value for the column’s value. The key needs to match the integration column name.

Here’s an example of a seed file using incremental mode combined with ‘append’ mode for output.

in:
  type: redshift
  host: redshift_endpoint
  port: 5439
  user: master_user
  password: master_password
  database: dev
  table: example
  incremental: true
  incremental_columns: [id]
  last_record: [10000]
out:
  mode: append
  exec: {}

To optimally use the incremental\_columns: option, set a SORTKEY on the relevant columns to avoid full table scans. For this example, the following index should be created:

CREATE TABLE dev (...) sortkey(id);

The connector automatically creates the query and sort values.

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000
ORDER BY id

The connector automatically generates last_record and uses it at the next scheduled execution.

in:
  type: redshift
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000

Create the Schedule

A new schedule can be created using the td connector:create command. The name of the schedule, cron-style schedule, the database and table where their data will be stored, and the data connector configuration file are required.

The cron parameter accepts these options: @hourly, @daily and @monthly.

By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. The --timezone option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by td connector:list.

$ td connector:list
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| Name                  | Cron        | Timezone | Delay | Database     | Table           | Config                     |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+
| daily_redshift_import | 10 0 * * *  | UTC      | 0     | td_sample_db | td_sample_table | {"type"=>"redshift", ... } |
+-----------------------+-------------+----------+-------+--------------+-----------------+----------------------------+

Show the Setting and History of Schedules

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_redshift_import
Name     : daily_redshift_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual execution, use td job jobid.

% td connector:history daily_redshift_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


% td job:show xxxxx
JobID       : 24903
Status      : success
Type        : bulkload
Database    : td_sample_db
Use '-v' option to show detailed messages.

Delete the Schedule

td connector:delete removes the schedule.

$ td connector:delete daily_redshift_import

Incremental Loading for Data Extensions

Treasure Data supports incremental loading for Data Extensions that have a date field.

If incremental: true is set, the data connector loads records according to the range specified by the from_date and the fetch_days for the specified date field.

Appendix

Modes for the Out Plugin

Import Modes

You can specify file import mode in the out section of the load.yml file.

The out: section controls how data is imported into a Treasure Data table.
For example, you may choose to append data or replace data in an existing table in Treasure Data.

ModeDescriptionExamples
AppendRecords are appended to the target table.in: ... out: mode: append
Always ReplaceReplaces data in the target table. Any manual schema changes made to the target table remain intact.in: ... out: mode: replace
Replace on new dataReplaces data in the target table only when there is new data to import.in: ... out: mode: replace_on_new_data

The following list provides details about all available options:

Database name: The name of the database you are transferring data from. (Ex. your_database_name)
Use custom SELECT query?: Use if you need more than a simple SELECT (columns) FROM table WHERE (condition).
Schema: The schema to transfer data from.
SELECT columns: If there are only specific columns you would like to pull data from, list them here. Otherwise all columns are transferred.
Table: The table from which you would like to import the data.
WHERE condition: If you need additional specificity on the data retrieved from the table you can specify it here as part of WHERE clause.
ORDER BY: Specify if you need the records ordered by a particular field.