Optionally, you can use the TD Toolbelt to configure the connection, create the job, and schedule executions.

Use the CLI to Configure the Connector

Before setting up the connector, install the ‘td’ command. Install the most current TD Toolbelt.

Create Seed Config File (seed.yml)

Prepare the seed.yml as shown in the following example, with your AWS access key and secret access key. You must also specify the bucket name, and source file name (or prefix for multiple files).

in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  # path to the *.json or *.csv or *.tsv file on your s3 bucket
  path_prefix: path/to/sample_file
  path_match_pattern: \.csv$ # a file will be skipped if its path doesn't match with this pattern

  ## some examples of regexp:
  #path_match_pattern: /archive/ # match files in .../archive/... directory
  #path_match_pattern: /data1/|/data2/ # match files in .../data1/... or .../data2/... directory
  #path_match_pattern: .csv$|.csv.gz$ # match files whose suffix is .csv or .csv.gz
out:
  mode: append

The Data Connector for Amazon S3 imports all files that match the specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz).

Using path_prefix with leading '/', can lead to unintended results. For example: "path_prefix: /path/to/sample_file" would result in plugin looking for file in s3://sample_bucket//path/to/sample_file which is different on S3 than the intended path of s3://sample_bucket/path/to/sample_file.

Guess Fields (Generate load.yml)

Use connector:guess. This command automatically reads the source files and assesses (uses logic to guess) the file format and its field/columns.

$ td connector:guess seed.yml -o load.yml

If you open up load.yml, you’ll see the assessed file format definitions including file formats, encodings, column names, and types.

in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - name: id
      type: long
    - name: company
      type: string
    - name: customer
      type: string
    - name: created_at
      type: timestamp
      format: '%Y-%m-%d %H:%M:%S'
out:
  mode: append

Then, you can see a preview of the data using the td connector:preview command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

The guess command needs more than 3 rows and 2 columns in the source data file because the command assesses the column definition using sample rows from source data.

If the system detects your column name or column type unexpectedly, modify load.yml directly and preview again.

Currently, the Data Connector supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.

Execute Load Job

Submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.

It’s also recommended to specify --time-column option because Treasure Data’s storage is partitioned by time (see data partitioning) If the option is not provided, the data connector chooses the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you can add a time column by using add_time filter option. For more details see add_time filter plugin.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
  --time-column created_at

The connector:issue command assumes that you have already created a database(td_sample_db)and a table(td_sample_table). If the database or the table do not exist in TD, this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto-create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

The data connector does not sort records on the server-side. To use time-based partitioning effectively, sort records in files beforehand.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Import Modes

You can specify file import mode in the out section of the load.yml file.

The out: section controls how data is imported into a Treasure Data table.
For example, you may choose to append data or replace data in an existing table in Treasure Data.

Mode

Description

Examples

Append

Records are appended to the target table.

in:
  ...
out:
  mode: append

Always Replace

Replaces data in the target table. Any manual schema changes made to the target table remain intact.

in:
  ...
out:
  mode: replace

Replace on new data

Replaces data in the target table only when there is new data to import.

in:
  ...
out:
  mode: replace_on_new_data

Scheduling Executions

You can schedule periodic data connector execution for incremental file import. We configure our scheduler carefully to ensure high availability.

For the scheduled import, you can import all files that match the specified prefix and one of these fields by condition:

  • If use_modified_time is disabled, the last path is saved for the next execution. On the second and subsequent runs, the connector only imports files that come after the last path in alphabetical order.

  • Otherwise, the time that the job is executed is saved for the next execution. On the second and subsequent runs, the connector only imports files that were modified after that execution time in alphabetical order.

Create a Schedule Using the TD Toolbelt

A new schedule can be created using the td connector:create command.

$ td connector:create daily_import "10 0 * * *" \
    td_sample_db td_sample_table load.yml

It’s also recommended to specify the --time-column option, because Treasure Data’s storage is partitioned by time (see also data partitioning).

$ td connector:create daily_import "10 0 * * *" \
    td_sample_db td_sample_table load.yml \
    --time-column created_at

The `cron` parameter also accepts three special options: `@hourly`, `@daily`, and `@monthly`.

By default, the schedule is setup in the UTC timezone. You can set the schedule in a timezone using -t or --timezone option. `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List All Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                   |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"s3", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+

Show Schedule Settings and History

td connector:show shows the execution setting of a schedule entry.

Where:



<access_key_id>

Allows you to access the TD AWS Services

<secret_access_key>

Allows you to access the TD AWS Services

<endpoint>

A computer that communicates back and forth with a network

Example value: s3.amazonaws.com

<bucket>

Container object within a database

Example value: https://my-bucket.s3.us-west-2.amazonaws.com.

<path_prefix>

Specify a prefix for target keys

Example values:

logging/

path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz


% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: s3
  access_key_id: <access_key_id>
  secret_access_key: <secret_access_key>
  endpoint: <endpoint>
  bucket: <bucket>
  path_prefix: <path_prefix>
  parser:
    charset: UTF-8
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete Schedule

td connector:delete removes the schedule.

$ td connector:delete daily_import

IAM Permissions

The IAM credentials specified in the YML configuration file and used for the connector:guess and connector:issue commands need to be allowed permissions for the AWS S3 resources that they need to access. If the IAM user does not possess these permissions, configure the user with one of the predefined Policy Definitions or create a new Policy Definition in JSON format.

The following example is based on the Policy Definition reference format, giving the IAM user read only (through GetObject and ListBucket actions) permission for the your-bucket:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

Replace your-bucket with the actual name of your bucket.

Use AWS Security Token Service (STS) as Temporary Credentials Provider

In certain cases, IAM basic authentication through access_key_id and secret_access_key might be too risky (although the secret_access_key is never clearly shown when a job is executed or after a session is created).

The S3 data connector can use AWS Secure Token Service (STS) provided Temporary Security Credentials. Using AWS STS, any IAM user can use his own access_key_id and secret_access_key to create a set of temporary new_access_key_id, new_secret_access_key, and session_token keys with an associated expiration time, after which the credentials become invalid.
The following are types of Temporary Security Credentials:

  • Session Token
    The simplest Security Credentials with an associated expiration time. The temporary credentials give access to all resources the original IAM credentials used to generate them had. These credentials are valid as long as they are not expired and the permissions of the original IAM credentials don’t change.

  • Federation Token
    Adds an extra layer of permission control over the Session Token above. When generating a Federation Token, the IAM user is required to specify a Permission Policy definition. The scope can be used to further narrow down which of the resources, accessible to the IAM user, the bearer of the Federation Token should get access to. Any Permission Policy definition can be used but the scope of the permission is limited to only all or a subset of the permissions the IAM user used to generate the token had. As for the Session Token, the Federation Token credentials are valid as long as they are not expired and the permissions associated to the original IAM credentials don’t change.

AWS STS Temporary Security Credentials can be generated using the AWS CLI or the AWS SDK in the language of your choice.

Session Token

$ aws sts get-session-token --duration-seconds 900
{
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T05:11:14Z",
        "AccessKeyId": "XXXXXXXXXX"
    }
}

Federation Token

$ aws sts get-federation-token --name temp_creds --duration-seconds 900 \
  --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'
{
    "FederatedUser": {
        "FederatedUserId": "523683666290:temp_creds",
        "Arn": "arn:aws:sts::523683666290:federated-user/temp_creds"
    },
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T06:06:17Z",
        "AccessKeyId": "XXXXXXXXXX"
    },
    "PackedPolicySize": 16
}

where: * temp_cred is the name of the Federated token/user * bucketname is the name of the bucket to give access to. Refer to the ARN specification for more details * s3:GetObject and s3:ListBucket are the basic read operation for a AWS S3 bucket.

AWS STS credentials cannot be revoked. They will remain effective until expired, or until you delete or remove the permissions of the original IAM user used to generate the credentials.

When your Temporary Security Credentials are generated, copy the SecretAccessKey, AccessKeyId, and SessionToken in your seed.yml file as follows.

in:
  type: s3
  auth_method: session
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  session_token: ZZZZZZZZZZ
  bucket: sample_bucket
  path_prefix: path/to/sample_file

and execute the Data Connector for S3 as usual.

Credential Expiration

Because STS credentials expire after the specified amount of time, the data connector job that uses the credential might eventually start failing when credential expiration occurs.
Currently, if the STS credentials are reported expired, the data connector job retries up to the maximum number of times (5) and eventually complete with 'error' status.


  • No labels