This import data connector for Amazon S3 enables you to import data and an optional filename from your CSV files stored in an AWS S3 bucket.
The Integration v1: Compared to this integration, the Amazon S3 Import Integration v1 doesn't support ingesting the filename. It supports the CSV file format only.
The Integration v2: The key difference and benefit of Amazon S3 Import Integration v2 over v1 is the added support for assume_role authentication
The Amazon S3 Filename Metadata Enhanced Import Integration works with CSV files only, and no other file type is supported.
You must have basic knowledge of Treasure Data.
If you use an AWS S3 bucket in the same region as your TD region, the IP address from which TD is accessing the bucket will be private and dynamically changing. If you want to restrict access, please specify the VPC ID instead of static IP Addresses. For example, if you are in the US region, configure access through vpc-df7066ba. If you are in the Tokyo region, configure access through vpc-e630c182 and, for the EU01 region, vpc-f54e6a9e.
Look up the region of TD Console by the URL you use to log in to TD, then refer to the data connector of your region in the URL.
| Region of TD Console | URL |
|---|---|
| US | https://console.treasuredata.com |
| Tokyo | https://console.treasuredata.co.jp |
| EU01 | https://console.eu01.treasuredata.com |
If your security policy requires IP whitelisting, you must add Treasure Data's IP addresses to your allowlist to ensure a successful connection.
Please find the complete list of static IP addresses, organized by region, at the following link:
https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/
When you configure a data connection, you provide authentication to access the integration. In Treasure Data, you configure the authentication and specify the source information.
- Navigate to Integrations Hub > Catalog and search for AWS S3.
- Select Create Authentication

- The New Authentication dialog opens. To authenticate using credentials, you need an Access key ID and a Secret access key.
- Set the following parameters. Select Continue. Name your new AWS S3 connection. Select Done.
| Parameter | description |
|---|---|
| Endpoint | S3 endpoint login user name. You can find region and endpoint information from AWS Document. (Ex. s3.ap-northeast-1.amazonaws.com) |
| Authentication Method | |
| basic | Uses access_key_id and secret_access_key to authenticate. See AWS Programmatic access. - Access Key ID - Secret access key |
| anonymous | Uses anonymous access. This auth method can access only public files. |
| session (Recommended) | Uses temporary-generated access_key_id, secret_access_key and session_token. (This authentication method is only available with data import. This can't be used with data export for now.) - Access Key ID - Secret access key - Secret token |
| Access Key ID | AWS S3 issued |
| Secret Access Key | AWS S3 issued |
After creating the authenticated connection, you are automatically taken to Authentications.
- Search for the connection you created.
- Select New Source.

- Type a name for your Source in the Data Transfer field**.
- Click Next.

The Source dialog opens. Edit the following parameters:

| Parameters | Description |
|---|---|
| Bucket | Provide the S3 bucket name (Ex. your_bucket_name) |
| Path Prefix | Specify a prefix for target keys. (Ex. logs/data_) |
| Path Regex | Use regexp to match file paths. The file is skipped if a file path doesn't match the specified pattern. For example, if you select the pattern .csv$ #, a file is skipped if its path doesn't match the pattern. Read more about regular expressions. |
| Include file name | If enabled, the filename will be saved to a new column. |
| Skip Glacier Objects | Select to skip processing objects stored in the Amazon Glacier storage class. If objects are stored in the Glacier storage class, but this option is not checked, an exception is thrown. |
| Filter by Modified Time | Choose how to filter files for ingestion: |
| If it is unchecked (default): |
|
| If it is checked: |
|
Example
You might need to scan all the files in a directory (such as from the top-level directory "/"). In such instances, you must use the CLI to import them.
Amazon CloudFront is a web service that speeds up static and dynamic web content distribution. You can configure CloudFront to create log files that contain detailed information about every user request that CloudFront receives. If you enable logging, you can save CloudFront log files, shown as follows:
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.a103fd5a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.b2aede4a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.594fa8e6.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.d12f42f9.gz]In this case, the Source Table settings are as shown:
- Bucket: your_bucket
- Path Prefix: logging/
- Path Regex: .gz$ (Not Required)
- Start after path: logging/E231A697YXWD39.2017-04-23-15.b2aede4a.gz (Assuming that you want to import the log files from 2017-04-23-16.)
- Incremental: true (if you want to schedule this job.)
- Select Next. The Data Settings page opens.
- Optionally, edit the data settings or skip this page of the dialog.

| Parameters | Description |
|---|---|
| Total file count limit | You can specify the maximum number of files to read |
| Minimum task size | Files up to this size will be grouped into one task. The default value is 268435456(bytes). |
Filters are available in the Create Source or Edit Source import settings for your S3, FTP, or SFTP connectors.
Import Integration Filters enable you to modify your imported data after you have completed Editing Data Settings for your import.
To apply import integration filters:
- Select Next in Data Settings.The Filters dialog opens.
- Select the filter option you want to add.

- Select Add Filter. The parameter dialog for that filter opens.
- Edit the parameters. For information on each filter type, see one of the following:
- Retaining Columns Filter
- Adding Columns Filter
- Dropping Columns Filter
- Expanding JSON Filter
- Digesting Filter
- Optionally, to add another filter of the same type, select Add within the specific column filter dialog.
- Optionally, to add another filter of a different type, select the filter option from the list and repeat the same steps.
- After you have added the filters you want, select **Next.**The Data Preview dialog opens.
You can see a preview of your data before running the import by selecting Generate Preview. Data preview is optional and you can safely skip to the next page of the dialog if you choose to.
- Select Next. The Data Preview page opens.
- If you want to preview your data, select Generate Preview.
- Verify the data.
For data placement, select the target database and table where you want your data placed and indicate how often the import should run.
Select Next. Under Storage, you will create a new or select an existing database and create a new or select an existing table for where you want to place the imported data.
Select a Database > Select an existing or Create New Database.
Optionally, type a database name.
Select a Table> Select an existing or Create New Table.
Optionally, type a table name.
Choose the method for importing the data.
- Append (default)-Data import results are appended to the table. If the table does not exist, it will be created.
- Always Replace-Replaces the entire content of an existing table with the result output of the query. If the table does not exist, a new table is created.
- Replace on New Data-Only replace the entire content of an existing table with the result output when there is new data.
Select the Timestamp-based Partition Key column. If you want to set a different partition key seed than the default key, you can specify the long or timestamp column as the partitioning time. As a default time column, it uses upload_time with the add_time filter.
Select the Timezone for your data storage.
Under Schedule, you can choose when and how often you want to run this query.
- Select Off.
- Select Scheduling Timezone.
- Select Create & Run Now.
- Select On.
- Select the Schedule. The UI provides these four options: @hourly, @daily and @monthly or custom cron.
- You can also select Delay Transfer and add a delay of execution time.
- Select Scheduling Timezone.
- Select Create & Run Now.
After your transfer has run, you can see the results of your transfer in Data Workbench > Databases.
Check the count of S3 files that your connector job is ingesting. If there are over 10,000 files, the performance degrades. To mitigate this issue, you can:
- Narrow the path_prefix option and reduce the number of S3 files.
- Set 268,435,456 (256MB) to min_task_size option.
There is a sample workflow file for S3 import integration. You can define the import settings using yml file, and run it using td\_load>: workflow operator. Variable definitions that cannot be used with the Source function of the TD console alone are possible with yml file-based execution.
You can refer to the sample code from https://github.com/treasure-data/treasure-boxes/tree/master/td_load/s3.
timezone: UTC
schedule:
daily>: 02:00:00
sla:
time: 08:00
+notice:
mail>: {data: Treasure Workflow Notification}
subject: This workflow is taking long time to finish
to: [me@example.com]
_export:
td:
dest_db: dest_db_ganesh
dest_table: dest_table_ganesh
+prepare_table:
td_ddl>:
create_databases: ["${td.dest_db}"]
create_tables: ["${td.dest_table}"]
database: ${td.dest_db}
+load:
td_load>: config/daily_load.yml
database: ${td.dest_db}
table: ${td.dest_table}You can optionally use the TD Toolbelt to configure the connection, create the job, and schedule executions.
Before setting up the connector, install the ‘td’ command. Install the most current TD Toolbelt.
Prepare the seed.yml, as shown in the following example, with your AWS and secret access keys. You must also specify the bucket and source file names (or prefixes for multiple files).
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
bucket: sample_bucket
include_file_name: true
# path to the *.csv file on your s3 bucket
path_prefix: path/to/sample_file
path_match_pattern: \.csv$ # a file will be skipped if its path doesn't match with this pattern
## some examples of regexp:
#path_match_pattern: /archive/ # match files in .../archive/... directory
#path_match_pattern: /data1/|/data2/ # match files in .../data1/... or .../data2/... directory
#path_match_pattern: .csv$|.csv.gz$ # match files whose suffix is .csv or .csv.gz
out:
mode: appendThe Data Connector for Amazon S3 imports all files that match the specified prefix. (e.g. path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz).
Using path_prefix with leading '/', can lead to unintended results. For example: "path_prefix: /path/to/sample_file" would result in plugin looking for file in s3://sample_bucket//path/to/sample_file which is different on S3 than the intended path of s3://sample_bucket/path/to/sample_file.
Use connector:guess. This command automatically reads the source files and assesses (uses logic to guess) the file format and its field/columns.
td connector:guess seed.yml -o load.ymlIf you open up load.yml, you’ll see the assessed file format definitions including file formats, encodings, column names, and types.
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
bucket: sample_bucket
path_prefix: path/to/sample_file
include_file_name: true
path_prefix: path/to/sample_file
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
skip_header_lines: 1
columns:
- name: id
type: long
- name: company
type: string
- name: customer
type: string
- name: created_at
type: timestamp
format: '%Y-%m-%d %H:%M:%S'
out:
mode: appendThen, you can see a preview of the data using the td connector:preview command.
$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id | company | customer | created_at |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. | David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. | Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+The guess command needs more than three rows and two columns in the source data file because it assesses the column definition using sample rows from the source data.
If the system detects your column name or column type unexpectedly, modify load.yml directly and preview again.
Currently, the Data Connector supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.
Submit the load job. Depending on the data size, it may take a couple of hours. Specify the Treasure Data database and table where the data should be stored.
Specifying the --time-column option is also recommended because Treasure Data’s storage is partitioned by time (see data partitioning). If the option is not provided, the data connector chooses the first long or timestamp column as the partitioning time. The column specified by*—- time-column* must be either long or timestamp.
If your data doesn’t have a time column, you can add one using the add_time filter option. For more details, see the add_time filter plugin.
td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_atThe connector:issue command assumes that you have already created a *database(td_sample_db)*and a table(td_sample_table). Suppose the database or the table does not exist in TD. In that case, this command will not succeed, so create the database and table manually or use the --auto-create-table option with td connector:issue command to auto-create the database and table:
td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-tableThe data connector does not sort records on the server side. To use time-based partitioning effectively, sort records in files beforehand.
If you have a time field, you don’t have to specify the --time-column option.
$ td connector:issue load.yml --database td_sample_db --table td_sample_tableYou can schedule periodic data connector execution for incremental file import. For more details, please refer here
The IAM credentials specified in the YML configuration file and used for the connector:guess and connector:issue commands need permissions to access the AWS S3 resources. If the IAM user does not possess these permissions, configure the user with one of the predefined Policy Definitions or create a new Policy Definition in JSON format.
The following example is based on the Policy Definition reference format, giving the IAM user read-only (through GetObject and ListBucket actions) permission for the your-bucket:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-bucket",
"arn:aws:s3:::your-bucket/*"
]
}
]
}Replace your-bucket with the actual name of your bucket.
Sometimes, IAM basic authentication through access_key_id and secret_access_key might be too risky (although the secret_access_key is never clearly shown when a job is executed or after a session is created).
The S3 data connector can use AWS Secure Token Service (STS), which provides temporary security credentials. Using AWS STS, any IAM user can use his own access_key_id and secret_access_key to create a set of temporary new_access_key_id, new_secret_access_key, and session_token keys with an associated expiration time, after which the credentials become invalid. The following are types of Temporary Security Credentials:
- Session Token The simplest security credentials are with an associated expiration time. The temporary credentials give access to all resources the original IAM credentials used to generate them had. These credentials are valid as long as they are not expired and the permissions of the original IAM credentials don’t change.
- Federation Token Adds an extra layer of permission control over the Session Token above. When generating a Federation Token, the IAM user is required to specify a definition of a Permission Policy. The scope can be used to narrow down further which resources accessible to the IAM user the bearer of the Federation Token should get access to. Any Permission Policy definition can be used, but the permission scope is limited to only all or a subset of the permissions the IAM user used to generate the token. As for the Session Token, the Federation Token credentials are valid as long as they are not expired and the permissions associated with the original IAM credentials don’t change.
AWS STS Temporary Security Credentials can be generated using the AWS CLI or the AWS SDK in the language of your choice.
$ aws sts get-session-token --duration-seconds 900{
"Credentials": {
"SecretAccessKey": "YYYYYYYYYY",
"SessionToken": "ZZZZZZZZZZ",
"Expiration": "2015-12-23T05:11:14Z",
"AccessKeyId": "XXXXXXXXXX"
}
}aws sts get-federation-token --name temp_creds --duration-seconds 900 --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'{
"FederatedUser": {
"FederatedUserId": "523683666290:temp_creds",
"Arn": "arn:aws:sts::523683666290:federated-user/temp_creds"
},
"Credentials": {
"SecretAccessKey": "YYYYYYYYYY",
"SessionToken": "ZZZZZZZZZZ",
"Expiration": "2015-12-23T06:06:17Z",
"AccessKeyId": "XXXXXXXXXX"
},
"PackedPolicySize": 16
}where: temp_cred is the name of the Federated token/user bucketname is the name of the bucket to which to give access. Refer to the ARN specification for more details s3:GetObject and s3:ListBucket are the essential read operation for an AWS S3 bucket.
AWS STS credentials cannot be revoked. They will remain effective until they expire or until you delete or remove the permissions of the original IAM user used to generate the credentials.
When your Temporary Security Credentials are generated, copy the SecretAccessKey, AccessKeyId, and SessionToken in your seed.yml file as follows.
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
include_file_name: true
session_token: ZZZZZZZZZZ
bucket: sample_bucket
path_prefix: path/to/sample_fileand execute the Data Connector for S3 as usual.
Because STS credentials expire after a specified amount of time, the data connector job that uses the credential might eventually start failing when credential expiration occurs. If the STS credentials are reported expired, the data connector job retries up to the maximum number of times (5) and eventually completes with 'error' status.