このAmazon S3用インポートデータコネクタは、AWS S3バケットに保存されているCSVファイルからデータとオプションのファイル名をインポートすることができます。
インテグレーション v1: このインテグレーションと比較して、Amazon S3 Import Integration v1はファイル名の取り込みをサポートしていません。CSVファイル形式のみをサポートしています。
インテグレーション v2: Amazon S3 Import Integration v2のv1との主な違いと利点は、assume_role認証のサポートが追加されたことです。
Amazon S3 Filename Metadata Enhanced Import IntegrationはCSVファイルのみで動作し、その他のファイルタイプはサポートされていません。
Treasure Dataの基本的な知識が必要です。
TDリージョンと同じリージョンのAWS S3バケットを使用する場合、TDがバケットにアクセスする際のIPアドレスはプライベートで動的に変更されます。アクセスを制限したい場合は、静的IPアドレスの代わりにVPC IDを指定してください。例えば、USリージョンの場合はvpc-df7066baを通じてアクセスを設定します。Tokyoリージョンの場合はvpc-e630c182を、EU01リージョンの場合はvpc-f54e6a9eを設定します。
TDコンソールのリージョンは、TDへのログインに使用するURLで確認し、URLからお使いのリージョンのデータコネクタを参照してください。
| TDコンソールのリージョン | URL |
|---|---|
| US | https://console.treasuredata.com |
| Tokyo | https://console.treasuredata.co.jp |
| EU01 | https://console.eu01.treasuredata.com |
セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。
リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/
データ接続を設定する際、インテグレーションへアクセスするための認証情報を提供します。Treasure Dataでは、認証を設定し、ソース情報を指定します。
- Integrations Hub > Catalogに移動し、AWS S3を検索します。
- Create Authenticationを選択します。

- New Authenticationダイアログが開きます。認証情報を使用して認証するには、Access key IDとSecret access keyが必要です。
- 以下のパラメータを設定します。Continueを選択します。新しいAWS S3接続に名前を付けます。Doneを選択します。
| パラメータ | 説明 |
|---|---|
| Endpoint | S3エンドポイントログインユーザー名。リージョンとエンドポイント情報はAWSドキュメントから確認できます。(例: s3.ap-northeast-1.amazonaws.com) |
| Authentication Method | |
| basic | access_key_idとsecret_access_keyを使用して認証します。AWS Programmatic accessを参照してください。 - Access Key ID - Secret access key |
| anonymous | 匿名アクセスを使用します。この認証方法は公開ファイルのみにアクセスできます。 |
| session (推奨) | 一時的に生成されたaccess_key_id、secret_access_key、session_tokenを使用します。(この認証方法はデータインポートでのみ使用可能です。現時点ではデータエクスポートでは使用できません。) - Access Key ID - Secret access key - Secret token |
| Access Key ID | AWS S3が発行 |
| Secret Access Key | AWS S3が発行 |
認証済み接続を作成すると、自動的にAuthenticationsに移動します。
- 作成した接続を検索します。
- New Sourceを選択します。

- Data TransferフィールドのSourceに名前を入力します。
- Nextをクリックします。

Sourceダイアログが開きます。以下のパラメータを編集します:

| パラメータ | 説明 |
|---|---|
| Bucket | S3バケット名を指定します(例: your_bucket_name) |
| Path Prefix | ターゲットキーのプレフィックスを指定します。(例: logs/data_) |
| Path Regex | 正規表現を使用してファイルパスをマッチさせます。ファイルパスが指定されたパターンと一致しない場合、ファイルはスキップされます。例えば、パターン*.csv$*を選択した場合、パスがパターンと一致しないファイルはスキップされます。正規表現について詳しく読む。 |
| Include file name | 有効にすると、ファイル名が新しいカラムに保存されます。 |
| Skip Glacier Objects | Amazon Glacierストレージクラスに保存されているオブジェクトの処理をスキップする場合に選択します。オブジェクトがGlacierストレージクラスに保存されているにもかかわらず、このオプションがチェックされていない場合、例外がスローされます。 |
| Filter by Modified Time | 取り込むファイルをフィルタリングする方法を選択します: |
| チェックが外れている場合(デフォルト): |
|
| チェックされている場合: |
|
例
ディレクトリ内のすべてのファイル(トップレベルディレクトリ「/」からなど)をスキャンする必要がある場合があります。そのような場合は、CLIを使用してインポートする必要があります。
Amazon CloudFrontは、静的および動的Webコンテンツの配信を高速化するWebサービスです。CloudFrontを設定して、CloudFrontが受信するすべてのユーザーリクエストに関する詳細情報を含むログファイルを作成できます。ロギングを有効にすると、次のようなCloudFrontログファイルを保存できます:
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.a103fd5a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.b2aede4a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.594fa8e6.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.d12f42f9.gz]この場合、ソーステーブルの設定は次のようになります:
- Bucket: your_bucket
- Path Prefix: logging/
- Path Regex: .gz$ (必須ではありません)
- Start after path: logging/E231A697YXWD39.2017-04-23-15.b2aede4a.gz (2017-04-23-16からログファイルをインポートすると仮定)
- Incremental: true (このジョブをスケジュールする場合)
- Nextを選択します。Data Settingsページが開きます。
- オプションで、データ設定を編集するか、このページをスキップします。

| パラメータ | 説明 |
|---|---|
| Total file count limit | 読み込むファイルの最大数を指定できます |
| Minimum task size | このサイズまでのファイルが1つのタスクにグループ化されます。デフォルト値は268435456(バイト)です。 |
Filters は、S3、FTP、または SFTP コネクターの Create Source または Edit Source インポート設定で使用できます。
Import Integration Filters を使用すると、インポート用のデータ設定の編集を完了した後、インポートされたデータを変更できます。
import integration filters を適用するには:
- Data Settings で Next を選択します。Filters ダイアログが開きます。
- 追加したいフィルターオプションを選択します。

- Add Filter を選択します。そのフィルターのパラメーターダイアログが開きます。
- パラメーターを編集します。各フィルタータイプの情報については、次のいずれかを参照してください:
- Retaining Columns Filter
- Adding Columns Filter
- Dropping Columns Filter
- Expanding JSON Filter
- Digesting Filter
- オプションで、同じタイプの別のフィルターを追加するには、特定の列フィルターダイアログ内で Add を選択します。
- オプションで、別のタイプの別のフィルターを追加するには、リストからフィルターオプションを選択して、同じ手順を繰り返します。
- 追加したいフィルターを追加した後、Next を選択します。Data Preview ダイアログが開きます。
インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。
- Next を選択します。Data Preview ページが開きます。
- データをプレビューする場合は、Generate Preview を選択します。
- データを確認します。
データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。
Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。
Database を選択 > Select an existing または Create New Database を選択します。
オプションで、database 名を入力します。
Table を選択 > Select an existing または Create New Table を選択します。
オプションで、table 名を入力します。
データをインポートする方法を選択します。
- Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
- Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
- Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。
データストレージの Timezone を選択します。
Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。
- Off を選択します。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
- On を選択します。
- Schedule を選択します。UI では、@hourly、@daily、@monthly、またはカスタム cron の 4 つのオプションが提供されます。
- Delay Transfer を選択して、実行時間の遅延を追加することもできます。
- Scheduling Timezone を選択します。
- Create & Run Now を選択します。
転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。
コネクタジョブが取り込んでいるS3ファイルの数を確認してください。10,000ファイルを超える場合、パフォーマンスが低下します。この問題を軽減するには、次の方法があります:
- path_prefixオプションを絞り込んで、S3ファイルの数を減らします。
- min_task_sizeオプションに268,435,456 (256MB)を設定します。
S3インポートインテグレーション用のサンプルワークフローファイルがあります。ymlファイルを使用してインポート設定を定義し、td\_load>:ワークフローオペレータを使用して実行できます。TDコンソールのSource機能だけでは使用できない変数定義が、ymlファイルベースの実行では可能です。
サンプルコードはhttps://github.com/treasure-data/treasure-boxes/tree/master/td_load/s3から参照できます。
timezone: UTC
schedule:
daily>: 02:00:00
sla:
time: 08:00
+notice:
mail>: {data: Treasure Workflow Notification}
subject: This workflow is taking long time to finish
to: [me@example.com]
_export:
td:
dest_db: dest_db_ganesh
dest_table: dest_table_ganesh
+prepare_table:
td_ddl>:
create_databases: ["${td.dest_db}"]
create_tables: ["${td.dest_table}"]
database: ${td.dest_db}
+load:
td_load>: config/daily_load.yml
database: ${td.dest_db}
table: ${td.dest_table}オプションでTD Toolbeltを使用して、接続の設定、ジョブの作成、実行のスケジュール設定を行うことができます。
コネクタをセットアップする前に、'td'コマンドをインストールしてください。最新のTD Toolbeltをインストールします。
以下の例に示すように、AWSアクセスキーとシークレットアクセスキーを使用してseed.ymlを準備します。また、バケット名とソースファイル名(または複数ファイルのプレフィックス)を指定する必要があります。
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
bucket: sample_bucket
include_file_name: true
# path to the *.csv file on your s3 bucket
path_prefix: path/to/sample_file
path_match_pattern: \.csv$ # a file will be skipped if its path doesn't match with this pattern
## some examples of regexp:
#path_match_pattern: /archive/ # match files in .../archive/... directory
#path_match_pattern: /data1/|/data2/ # match files in .../data1/... or .../data2/... directory
#path_match_pattern: .csv$|.csv.gz$ # match files whose suffix is .csv or .csv.gz
out:
mode: appendAmazon S3用データコネクタは、指定されたプレフィックスに一致するすべてのファイルをインポートします。(例: path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)
先頭に'/'を付けてpath_prefixを使用すると、意図しない結果になる可能性があります。例えば:「path_prefix: /path/to/sample_file」とすると、プラグインはs3://sample_bucket//path/to/sample_fileでファイルを探しますが、これはS3上の意図したパスs3://sample_bucket/path/to/sample_fileとは異なります。
connector:guessを使用します。このコマンドは自動的にソースファイルを読み取り、ファイル形式とそのフィールド/カラムを評価(推測ロジックを使用)します。
td connector:guess seed.yml -o load.ymlload.ymlを開くと、ファイル形式、エンコーディング、カラム名、型を含む評価されたファイル形式定義が表示されます。
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
bucket: sample_bucket
path_prefix: path/to/sample_file
include_file_name: true
path_prefix: path/to/sample_file
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
skip_header_lines: 1
columns:
- name: id
type: long
- name: company
type: string
- name: customer
type: string
- name: created_at
type: timestamp
format: '%Y-%m-%d %H:%M:%S'
out:
mode: appendその後、td connector:previewコマンドを使用してデータのプレビューを表示できます。
$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id | company | customer | created_at |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. | David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. | Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. | Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+guessコマンドは、ソースデータのサンプル行を使用してカラム定義を評価するため、ソースデータファイルに3行以上、2カラム以上が必要です。
システムがカラム名またはカラムタイプを予期せず検出した場合は、load.ymlを直接変更して再度プレビューしてください。
現在、データコネクタは「boolean」、「long」、「double」、「string」、「timestamp」タイプの解析をサポートしています。
ロードジョブを送信します。データサイズによっては、数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを指定します。
Treasure Dataのストレージは時間でパーティション化されているため、--time-columnオプションを指定することを推奨します(データパーティショニングを参照)。オプションが指定されていない場合、データコネクタは最初のlongまたはtimestampカラムをパーティショニング時刻として選択します。—- time-columnで指定するカラムは、longまたはtimestampのいずれかである必要があります。
データに時刻カラムがない場合は、add_timeフィルタオプションを使用して追加できます。詳細については、add_timeフィルタプラグインを参照してください。
td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_atconnector:issueコマンドは、すでにdatabase(td_sample_db)とtable(td_sample_table)を作成していることを前提としています。データベースまたはテーブルがTDに存在しない場合、このコマンドは成功しないため、データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成します:
td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-tableデータコネクタはサーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。
時刻フィールドがある場合は、--time-columnオプションを指定する必要はありません。
$ td connector:issue load.yml --database td_sample_db --table td_sample_table増分ファイルインポートのために、定期的なデータコネクタの実行をスケジュール設定できます。詳細については、こちらを参照してください。
YML設定ファイルで指定され、connector:guessおよびconnector:issueコマンドで使用されるIAM認証情報には、AWS S3リソースにアクセスするためのパーミッションが必要です。IAMユーザーがこれらのパーミッションを持っていない場合は、事前定義されたポリシー定義のいずれかでユーザーを設定するか、JSON形式で新しいポリシー定義を作成します。
以下の例は、ポリシー定義リファレンス形式に基づいており、IAMユーザーにyour-bucketへの読み取り専用(GetObjectおよびListBucketアクションを通じて)パーミッションを付与します:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-bucket",
"arn:aws:s3:::your-bucket/*"
]
}
]
}your-bucketを実際のバケット名に置き換えてください。
場合によっては、access_key_idとsecret_access_keyによるIAM基本認証はリスクが高すぎる可能性があります(ただし、secret_access_keyはジョブの実行時やセッション作成後に明確に表示されることはありません)。
S3データコネクタは、一時セキュリティ認証情報を提供するAWS Secure Token Service (STS)を使用できます。AWS STSを使用すると、任意のIAMユーザーが自分のaccess_key_idとsecret_access_keyを使用して、有効期限付きの一時的なnew_access_key_id、new_secret_access_key、session_tokenキーのセットを作成でき、有効期限後は認証情報が無効になります。 一時セキュリティ認証情報のタイプは次のとおりです:
- Session Token 最もシンプルなセキュリティ認証情報で、有効期限が関連付けられています。一時認証情報は、それらを生成するために使用された元のIAM認証情報が持っていたすべてのリソースへのアクセスを許可します。これらの認証情報は、有効期限が切れておらず、元のIAM認証情報のパーミッションが変更されていない限り有効です。
- Federation Token 上記のSession Tokenに追加のパーミッション制御レイヤーを追加します。Federation Tokenを生成する際、IAMユーザーはパーミッションポリシーの定義を指定する必要があります。このスコープを使用して、IAMユーザーがアクセスできるリソースのうち、Federation Tokenの保有者がアクセスできるリソースをさらに絞り込むことができます。任意のパーミッションポリシー定義を使用できますが、パーミッションスコープは、トークンの生成に使用されたIAMユーザーのパーミッションのすべてまたはサブセットに限定されます。Session Tokenと同様に、Federation Token認証情報は、有効期限が切れておらず、元のIAM認証情報に関連付けられたパーミッションが変更されていない限り有効です。
AWS STS一時セキュリティ認証情報は、AWS CLIまたは選択した言語のAWS SDKを使用して生成できます。
$ aws sts get-session-token --duration-seconds 900{
"Credentials": {
"SecretAccessKey": "YYYYYYYYYY",
"SessionToken": "ZZZZZZZZZZ",
"Expiration": "2015-12-23T05:11:14Z",
"AccessKeyId": "XXXXXXXXXX"
}
}aws sts get-federation-token --name temp_creds --duration-seconds 900 --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'{
"FederatedUser": {
"FederatedUserId": "523683666290:temp_creds",
"Arn": "arn:aws:sts::523683666290:federated-user/temp_creds"
},
"Credentials": {
"SecretAccessKey": "YYYYYYYYYY",
"SessionToken": "ZZZZZZZZZZ",
"Expiration": "2015-12-23T06:06:17Z",
"AccessKeyId": "XXXXXXXXXX"
},
"PackedPolicySize": 16
}ここで: temp_credはFederatedトークン/ユーザーの名前、bucketnameはアクセスを許可するバケットの名前です。詳細についてはARN仕様を参照してください。s3:GetObjectとs3:ListBucketは、AWS S3バケットの基本的な読み取り操作です。
AWS STS認証情報は取り消すことができません。有効期限が切れるか、認証情報の生成に使用された元のIAMユーザーのパーミッションを削除または除去するまで有効なままです。
一時セキュリティ認証情報が生成されたら、次のようにseed.ymlファイルにSecretAccessKey、AccessKeyId、SessionTokenをコピーします。
in:
type: s3_fme
access_key_id: XXXXXXXXXX
secret_access_key: YYYYYYYYYY
include_file_name: true
session_token: ZZZZZZZZZZ
bucket: sample_bucket
path_prefix: path/to/sample_fileそして、通常どおりS3用データコネクタを実行します。
STS認証情報は指定された時間後に有効期限が切れるため、認証情報を使用するデータコネクタジョブは、認証情報の有効期限が切れると最終的に失敗し始める可能性があります。 STS認証情報の有効期限が切れたと報告された場合、データコネクタジョブは最大回数(5回)まで再試行し、最終的に'error'ステータスで完了します。