Data Connector for Amazon S3

The Data Connector for Amazon S3 enables importing the data from your JSON, TSV and CSV files stored in an S3 bucket.

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data

Step 0: Install ‘td’ command

Install the newest Treasure Data Toolbelt.

Step 1: Create Seed Config File (seed.yml)

First, prepare the seed.yml as mentioned below, with your AWS access key and secret access key. You must also specify bucket name, and source file name (or prefix for multiple files).

in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  # path to the *.json or *.csv or *.tsv file on your s3 bucket
  path_prefix: path/to/sample_file
out:
  mode: append

The Data Connector for Amazon S3 imports all files that match the specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)

For more details on available out modes, see Appendix.

Step 2: Guess Fields (Generate load.yml)

Second, use connector:guess. This command automatically reads the source files, and intelligently guesses the file format and its field/columns.

$ td connector:guess seed.yml -o load.yml

If you open up load.yml, you’ll see the guessed file format definitions including file formats, encodings, column names, and types.

in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - name: id
      type: long
    - name: company
      type: string
    - name: customer
      type: string
    - name: created_at
      type: timestamp
      format: '%Y-%m-%d %H:%M:%S'
out:
  mode: append

Then, you can see a preview of the data using the td connector:preview command.

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+
Untitled-3
guess command needs over 3 rows and 2 columns in source data file, because it guesses column definition using sample rows from source data.
Untitled-3
If the system detects your column name or column type unexpectedly, modify `load.yml` directly and preview again.
Untitled-3
Currently, the Data Connector supports parsing of "boolean", "long", "double", "string", and "timestamp" types.
Untitled-3
The `preview` command will download one file from the specified bucket and display the results from that file. This may cause a difference in results from the preview and issue commands.

Step 3: Execute Load Job

Finally, submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.

It’s also recommended to specify --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture) If the option is not provided, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
  --time-column created_at

The above command assumes you have already created a database(td_sample_db) and a table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
Untitled-3
At present, the Data Connector does not sort records on server-side. To use time-based partitioning effectively, please sort records in files beforehand. This restriction will be solved in the near future.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Scheduled Execution

You can schedule periodic Data Connector execution for incremental S3 file import. We take great care in distributing and operating our scheduler in order to achieve high availability. By using this feature, you no longer need a cron daemon on your local datacenter.

For the scheduled import, the Data Connector for Amazon S3 imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) at first and remembers the last path (path/to/sample_201505.csv.gz) for the next execution.

On the second and subsequent runs, it only imports files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample_201506.csv.gz, …)

Create Schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where their data will be stored, and the Data Connector configuration file.

$ td connector:create daily_import "10 0 * * *" \
    td_sample_db td_sample_table load.yml

It’s also recommended to specify the --time-column option, since Treasure Data’s storage is partitioned by time (see also architecture)

$ td connector:create daily_import "10 0 * * *" \
    td_sample_db td_sample_table load.yml \
    --time-column created_at
Untitled-3
The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.
Untitled-3
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Please note that `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List All Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                   |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"s3", "access_key_id"... |
+--------------+--------------+----------+-------+--------------+-----------------+------------------------------------------+

Show Schedule Settings And History

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: s3
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  endpoint: s3.amazonaws.com
  bucket: sample_bucket
  path_prefix: path/to/sample_
  parser:
    charset: UTF-8
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, please use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import

Appendix

A) Modes for out plugin

You can specify file import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace

This mode replaces data in the target table. Please note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

B) IAM Permissions

The IAM credentials specified in the YML configuration file and used for the connector:guess and connector:issue commands need to be allowed permissions for the AWS S3 resources they need to access. If the IAM user does not possess these permissions, please configure the user with one of the predefined Policy Definitions or create a new Policy Definition in JSON format.

This is an example based off the Policy Definition reference format giving the IAM user read only (through GetObject and ListBucket actions) permission for the your-bucket bucket:

{
  "Version": "2015-11-11",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

Please replace your-bucket with the actual name of your bucket.

C) Use AWS Security Token Service (STS) as temporary credentials provider

In certain cases, IAM basic authentication through access_key_id and secret_access_key may be too risky (although the secret_access_key is never shown in clear when a job is executed or after a session is created).

The S3 data connector can use AWS Secure Token Service (STS) provided Temporary Security Credentials. Using AWS STS, any IAM user can use his own access_key_id and secret_access_key to create a set of temporary new_access_key_id, new_secret_access_key, and session_token keys with an associated expiration time, after which the credentials become invalid.
There are essentially 2 types of Temporary Security Credentials:

  1. Session Token
    The simplest Security Credentials with an associated expiration time. The temporary credentials give access to all resources the original IAM credentials used to generate them had. These credentials are valid as long as they are not expired and the permissions of the original IAM credentials don’t change.
  2. Federation Token
    Adds an extra layer of permission control over the Session Token above. When generating a Federation Token, the IAM user is required to specify a Permission Policy definition. The scope can be used to further narrow down which of the resources accessible to the IAM user the bearer of the Federation Token should get access to. Any Permission Policy definition can be used but the scope of the permission is limited to only all or a subset of the permissions the IAM user used to generate the token had. As for the Session Token, the Federation Token credentials are valid as long as they are not expired and the permissions associated to the original IAM credentials don’t change.

AWS STS Temporary Security Credentials can be generated using the AWS CLI or the AWS SDK in the language of your choice.

Session Token

$ aws sts get-session-token --duration-seconds 900
{
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T05:11:14Z",
        "AccessKeyId": "XXXXXXXXXX"
    }
}

Federation Token

$ aws sts get-federation-token --name temp_creds --duration-seconds 900 \
  --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'
{
    "FederatedUser": {
        "FederatedUserId": "523683666290:temp_creds",
        "Arn": "arn:aws:sts::523683666290:federated-user/temp_creds"
    },
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T06:06:17Z",
        "AccessKeyId": "XXXXXXXXXX"
    },
    "PackedPolicySize": 16
}

where: * temp_cred is the name of the Federated token/user * bucketname is the name of the bucket to give access to. Refer to the ARN specification for more details * s3:GetObject and s3:ListBucket are the basic read operation for a AWS S3 bucket.

Untitled-3
AWS STS credentials cannot be revoked. They will remain effective until expired unless you don't delete or remove the permissions of the original IAM user used to generate the credentials.

Once your Temporary Security Credentials are generated, please copy the SecretAccessKey, AccessKeyId, and SessionToken in your seed.yml file as follows.

in:
  type: s3
  auth_method: session
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  session_token: ZZZZZZZZZZ
  bucket: sample_bucket
  path_prefix: path/to/sample_file

and execute the Data Connector for S3 as usual.

Untitled-3
Since STS credentials expire after the specified amount of time, Data Connector job using them may eventually start failing once that happens.
Currently, if the STS credentials are reported expired, the Data Connector job will retry up to the maximum number of times (5) and eventually complete with 'error' status.

Use Connector UI

You can submit a DataConnector for AWS S3 job on Connector UI

1. Create a new AWS S3 connection

First, you need a registration of your credential. Please set the following parameters.

  • Endpoint: S3 endpoint login user name. You can find a region and endpoint information from AWS Document. (Ex. s3-ap-northeast-1.amazonaws.com)
  • Authentication Method:
    • basic: uses access_key_id and secret_access_key to authenticate. See (here)[https://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.html]
      • Access Key ID
      • Secret access key
    • annonymous: uses anonymous access. This auth method can access only public files.
    • session: uses temporary-generated access_key_id, secret_access_key and session_token.
      • Access Key ID
      • Secret access key
      • Secret token

2. Transfer data from AWS S3

Next, you need to create “New Transfer” on My Connections page. You can prepare an adhoc DataConnector job or an schedule DataConnector job. In this section, the following 4 steps are required.

2.1. Fetch from

You need a registration of information you would like to ingest.

  • Bucket: S3 bucket name (Ex. your_bucket_name)
  • Path Prefix: prefix of target keys. (Ex. logs/data_)
  • Path Regex: regexp to match file paths. If a file path doesn’t match with this pattern, the file will be skipped. (Ex. .csv$ # in this case, a file will be skipped if its path doesn’t match with this pattern)
  • Start after path: inserts last_path parameter so that first execution skips files before the path. (Ex. logs/data_20170101.csv)
  • Incremental: enables incremental loading. If incremental loading is enabled, config diff for the next execution will include last_path parameter so that next execution skips files before the path. Otherwise, last_path will not be included.
Untitled-3
You can limit the access to your S3 bucket/IAM user by using a list of static IPs. Please contact support@treasuredata.com if you need it.

Example: CloudFront

Amazon CloudFront is a web service that speeds up distribution of your static and dynamic web content. You can configure CloudFront to create log files that contain detailed information about every user request that CloudFront receives. If you enable logging, you can save CloudFront logfiles in like below.

[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.a103fd5a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.b2aede4a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.594fa8e6.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.d12f42f9.gz]

In this case, “Fetch from” setting should be below.

  • Bucket: your_bucket
  • Path Prefix: logging/
  • Path Regex: .gz$ (Not Required)
  • Start after path: logging/E231A697YXWD39.2017-04-23-15.b2aede4a.gz (Assuming that you want to import the logfiles from 2017-04-23-16.)
  • Incremental: true (if you want to schedule this job.)

https://treasure-data.gyazo.com/c7194997e1bd72d27b304774cdedc912

2.2. Preview

In this section, you can see a preview of data you configured. If you couldn’t reach this page, you might get any errors on Console. At that time, please contact to support@treasuredata.com

https://treasure-data.gyazo.com/723171ddf8b5951000775fe98ccf8fe4

If you would like to set specified column name, please choose “Advanced Settings” button.

2.2.1. Advanced Settings

Advanced Settings allow you to edit guessed properties. Please edit the following section if you need.

  • Default timezone: changes Time zone of timestamp columns if the value itself doesn’t include time zone.
  • Columns:
    • Name: changes a name of the column. Column name is supported consisting of lowercase alphabets, numbers, and “_” only.
    • Type: parses a value as a specified type. And then, it stores after converting to TreasureData schema.
      • boolean
      • long
      • timestamp: will be imported as String type at TreasureData (Ex. 2017-04-01 00:00:00.000)
      • double
      • string
      • json
  • Total file count limit: maximum number of files to read. (optional)
Untitled-3
Currently, editing on Connector UI is limited than CLI

2.3. Transfer to

In this phase, please select your target database/table you want to import to.

  • Mode: Append/Replace
  • Partition key Seed: choose the long or timestamp column as the partitioning time. As default time column, it’s used upload_time with using add_time filter.

https://treasure-data.gyazo.com/b4d93a06a2bf822700e64fa70db5ba14

2.4. When

In this phase, you can set an adhoc or schedule configuration for your job.

  • When
    • Once now: set one time job.
    • Repeat…
      • Schedule: accepts these three options: @hourly, @daily and @monthly and custom cron.
      • Delay Transfer: add a delay of execution time.
    • TimeZone: supports extended timezone formats like ‘Asia/Tokyo’.

https://treasure-data.gyazo.com/67e23e505bd4be773c0053ef9cd9046f

2.5. My Input Transfers

Finally, your DataConnector jobs are listed up on My Input Transfer. In this page, you can edit your existing job. Also, you can see a DataConnector job’s detail, which run before, on clicking Last Transfer section.

https://treasure-data.gyazo.com/f6ab55c382b7eea32396e51718f3512e


Last modified: Feb 22 2017 23:40:42 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.