Data Connector for PostgreSQL

This article describes how to use the data connector for PostgreSQL, which allows you to directly import data from your PostgreSQL to Treasure Data.

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data
  • Basic knowledge of PostgreSQL
  • A PostgreSQL instance running remotely, for example on RDS.

Step 0: Install ‘td’ command

Install the newest Treasure Data Toolbelt.

Step 1: Create Seed Config File (seed.yml)

First, please prepare seed.yml with your PostgreSQL access information:

in:
  type: postgresql
  host: postgresql_host_name
  port: 5432
  ssl: true
  ssl_version: TLS
  user: test_user
  password: test_password
  database: test_database
  table: test_table
  select: "*"
out:
  mode: replace

This example will dump all records inside the table. You can have more detailed control with additional parameters.

For more details on available out modes, see Appendix Also, more details on available ssl_version, see Appendix

Step 2: Guess Fields (Generate load.yml)

Second, please use connector:guess. This command automatically reads the target data, and intelligently guesses the data format.

$ td connector:guess seed.yml -o load.yml

If you open up load.yml, you’ll see guessed file format definitions including, in some cases, file formats, encodings, column names, and types.

Step 3 (Optional): Preview data to be imported

You can preview data to be imported using the command td connector:preview.

$ td connector:preview load.yml
+---------+--------------+----------------------------------+------------+---------------------------+
| id:long | name:string  | description:string               | price:long | created_at:timestamp      |
+---------+--------------+----------------------------------+------------+---------------------------+
| 1       | "item name1" | "26e3c3625366591bc2ffc6e262976e" | 2419       | "2014-02-16 13:01:06 UTC" |
| 2       | "item name2" | "3e9dd9474dacb78afd607f9e0a3366" | 1298       | "2014-05-24 13:59:26 UTC" |
| 3       | "item name3" | "9b6c9e4a140284d3951681e9e047f6" | 9084       | "2014-06-21 00:18:21 UTC" |
| 4       | "item name4" | "a11faf5e63c1b02a3d4c2b5cbb7331" | 669        | "2014-05-02 03:44:08 UTC" |
| 6       | "item name6" | "6aa15471c373ddc8a6469e1c918f98" | 3556       | "2014-03-29 08:30:23 UTC" |
+---------+--------------+----------------------------------+------------+---------------------------+

Step 4: Execute Load Job

Finally, submit the load job. The job may take a couple of hours to run depending on the data size. Users need to specify the database and table where their data is stored.

It is recommended to specify --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture) If the option is not given, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

The above command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
Untitled-3
You can assign Time Format column to the "Partitioning Key" by "--time-column" option.

Scheduled execution

You can schedule periodic Data Connector execution for periodic PostgreSQL import. We take great care in distributing and operating our scheduler in order to achieve high availability. By using this feature, you no longer need a cron daemon on your local data center.

Incremental load

You can load records incrementally by specifying columns from your table to the incremental_columns parameter. Optionally, you may specify some initial values to the last_record parameter.

in:
  type: postgresql
  host: postgresql_host_name
  port: 5432
  user: test_user
  password: test_password
  database: test_database
  table: test_table
  incremental: true
  incremental_columns: [id, created_at]
  last_record: [10000, '2014-02-16T13:01:06.000000Z']
out:
  mode: append
  exec: {}
Untitled-3
To optimally use `incremental_columns:` option, please create an index on the relevant columns to avoid full table scans. For this example, following index should be created:
CREATE INDEX embulk_incremental_loading_index ON test_table (id, created_at);

The connector will automatically create the query and sort value.

# when last_record wasn't given
SELECT * FROM(
    ...original query is here
)
ORDER BY id, created_at

::: terminal
# when last_record was given
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000 OR (id = 10000 AND created_at > '2014-02-16T13:01:06.000000Z')
ORDER BY id, created_at

The connector will automatically generate last_record and use it at next scheduled execution.

in:
  type: postgresql
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
  - '2015-06-16T16:32:14.000000Z'
Untitled-3
`query` option can't available when you set `incremental: true`.
Untitled-3
Currently, only strings, integers, timestamp, and timestamptz(timestamp with timezone) are supported as incremental_columns.

Create the schedule

A new schedule can be created using the td connector:create command. The name of the schedule, cron-style schedule, the database and table where their data will be stored, and the Data Connector configuration file are required.

$ td connector:create \
    daily_postgresql_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml
Untitled-3
The `cron` parameter also accepts three options: `@hourly`, `@daily` and `@monthly`.
Untitled-3
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Please note that `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by td connector:list.

$ td connector:list
+-------------------------+--------------+----------+-------+--------------+-----------------+------------------------------+
| Name                    | Cron         | Timezone | Delay | Database     | Table           | Config                       |
+-------------------------+--------------+----------+-------+--------------+-----------------+------------------------------+
| daily_postgresql_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"type"=>"postgresql", ... } |
+-------------------------+--------------+----------+-------+--------------+-----------------+------------------------------+

Show the Setting and History of Schedules

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_postgresql_import
Name     : daily_postgresql_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual execution, please use td job <jobid>.

% td connector:history daily_postgresql_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_postgresql_import

Appendix

A) Modes for out plugin

You can specify file import mode in out section of load.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Please note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

B) Load hstore column

PostgreSQL’s hstore type is retrieved as string type when DataConnector read it at first. Therefore, if you want to use hstore type as json type, you need to specify config_options and explicitly convert the type to json type.

For example, v_hstore is hstore type in PostgreSQL, then

in:
  type: postgresql
  host: xxx
  ...
  table: my_tbl
  select: "*"
  column_options:
    v_hstore: {type: json} # explicit type conversion: string type to json type
out:
  ...

C) SSL version

You can use specific SSL version that your PostgreSQL server is using with `ssl_version` option.

:::yaml
in:
  type: postgresql
  ...
  ssl: true
  ssl_version: TLSv1.1

Supported values are as followings.

  • TLS
  • TLSv1.1
  • TLSv1.2
  • TLSv1.3

Further Information


Last modified: Feb 22 2017 23:40:42 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.