Skip to content
Last updated

Amazon S3 ファイル名メタデータ拡張インポートインテグレーション

このAmazon S3用インポートデータコネクタは、AWS S3バケットに保存されているCSVファイルからデータとオプションのファイル名をインポートすることができます。

異なるAmazon S3インポートインテグレーション

インテグレーション v1: このインテグレーションと比較して、Amazon S3 Import Integration v1はファイル名の取り込みをサポートしていません。CSVファイル形式のみをサポートしています。

インテグレーション v2: Amazon S3 Import Integration v2のv1との主な違いと利点は、assume_role認証のサポートが追加されたことです。

Amazon S3 Filename Metadata Enhanced Import IntegrationはCSVファイルのみで動作し、その他のファイルタイプはサポートされていません。

前提条件

Treasure Dataの基本的な知識が必要です。

TDリージョンと同じリージョンのAWS S3バケットを使用する場合、TDがバケットにアクセスする際のIPアドレスはプライベートで動的に変更されます。アクセスを制限したい場合は、静的IPアドレスの代わりにVPC IDを指定してください。例えば、USリージョンの場合はvpc-df7066baを通じてアクセスを設定します。Tokyoリージョンの場合はvpc-e630c182を、EU01リージョンの場合はvpc-f54e6a9eを設定します。

TDコンソールのリージョンは、TDへのログインに使用するURLで確認し、URLからお使いのリージョンのデータコネクタを参照してください。

Treasure Data Integration の静的 IP アドレス

セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。

リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/

TDコンソールで新しい認証を作成

データ接続を設定する際、インテグレーションへアクセスするための認証情報を提供します。Treasure Dataでは、認証を設定し、ソース情報を指定します。

  1. Integrations Hub > Catalogに移動し、AWS S3を検索します。
  2. Create Authenticationを選択します。

  1. New Authenticationダイアログが開きます。認証情報を使用して認証するには、Access key IDとSecret access keyが必要です。
  2. 以下のパラメータを設定します。Continueを選択します。新しいAWS S3接続に名前を付けます。Doneを選択します。
パラメータ説明
EndpointS3エンドポイントログインユーザー名。リージョンとエンドポイント情報はAWSドキュメントから確認できます。(例: s3.ap-northeast-1.amazonaws.com
Authentication Method
basicaccess_key_idとsecret_access_keyを使用して認証します。AWS Programmatic accessを参照してください。 - Access Key ID - Secret access key
anonymous匿名アクセスを使用します。この認証方法は公開ファイルのみにアクセスできます。
session (推奨)一時的に生成されたaccess_key_id、secret_access_key、session_tokenを使用します。(この認証方法はデータインポートでのみ使用可能です。現時点ではデータエクスポートでは使用できません。) - Access Key ID - Secret access key - Secret token
Access Key IDAWS S3が発行
Secret Access KeyAWS S3が発行

TDコンソール経由でAWS S3データをTreasure Dataに転送

認証済み接続を作成すると、自動的にAuthenticationsに移動します。

  1. 作成した接続を検索します。
  2. New Sourceを選択します。

New Source

  1. Data TransferフィールドのSourceに名前を入力します。
  2. Nextをクリックします。

ソーステーブルの設定

Sourceダイアログが開きます。以下のパラメータを編集します:

パラメータ説明
BucketS3バケット名を指定します(例: your_bucket_name
Path Prefixターゲットキーのプレフィックスを指定します。(例: logs/data_
Path Regex正規表現を使用してファイルパスをマッチさせます。ファイルパスが指定されたパターンと一致しない場合、ファイルはスキップされます。例えば、パターン*.csv$*を選択した場合、パスがパターンと一致しないファイルはスキップされます。正規表現について詳しく読む。
Include file name有効にすると、ファイル名が新しいカラムに保存されます。
Skip Glacier ObjectsAmazon Glacierストレージクラスに保存されているオブジェクトの処理をスキップする場合に選択します。オブジェクトがGlacierストレージクラスに保存されているにもかかわらず、このオプションがチェックされていない場合、例外がスローされます。
Filter by Modified Time取り込むファイルをフィルタリングする方法を選択します:
チェックが外れている場合(デフォルト):
  • Start after path: last_pathパラメータを挿入し、最初の実行でパス以前のファイルをスキップします。(例: logs/data_20170101.csv)
  • Incremental: 増分読み込みを有効にします。増分読み込みが有効な場合、次回実行の設定差分にlast_pathパラメータが含まれるため、次の実行ではパス以前のファイルがスキップされます。それ以外の場合、last_pathは含まれません。
チェックされている場合:
  • Modified after: last_modified_timeパラメータを挿入し、最初の実行で指定されたタイムスタンプ以前に変更されたファイルをスキップします。(例: 2019-06-03T10:30:19.806Z)
  • Incremental by Modified Time: 変更時刻による増分読み込みを有効にします。増分読み込みが有効な場合、次回実行の設定差分にlast_modified_timeパラメータが含まれるため、次の実行ではその時刻以前に変更されたファイルがスキップされます。それ以外の場合、last_modified_timeは含まれません。

ディレクトリ内のすべてのファイル(トップレベルディレクトリ「/」からなど)をスキャンする必要がある場合があります。そのような場合は、CLIを使用してインポートする必要があります。

Amazon CloudFrontは、静的および動的Webコンテンツの配信を高速化するWebサービスです。CloudFrontを設定して、CloudFrontが受信するすべてのユーザーリクエストに関する詳細情報を含むログファイルを作成できます。ロギングを有効にすると、次のようなCloudFrontログファイルを保存できます:

[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.a103fd5a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-15.b2aede4a.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.594fa8e6.gz]
[your_bucket] - [logging] - [E231A697YXWD39.2017-04-23-16.d12f42f9.gz]

この場合、ソーステーブルの設定は次のようになります:

  • Bucket: your_bucket
  • Path Prefix: logging/
  • Path Regex: .gz$ (必須ではありません)
  • Start after path: logging/E231A697YXWD39.2017-04-23-15.b2aede4a.gz (2017-04-23-16からログファイルをインポートすると仮定)
  • Incremental: true (このジョブをスケジュールする場合)

データ設定

  1. Nextを選択します。Data Settingsページが開きます。
  2. オプションで、データ設定を編集するか、このページをスキップします。

パラメータ説明
Total file count limit読み込むファイルの最大数を指定できます
Minimum task sizeこのサイズまでのファイルが1つのタスクにグループ化されます。デフォルト値は268435456(バイト)です。

Filters

Filters は、S3、FTP、または SFTP コネクターの Create Source または Edit Source インポート設定で使用できます。

Import Integration Filters を使用すると、インポート用のデータ設定の編集を完了した後、インポートされたデータを変更できます。

import integration filters を適用するには:

  1. Data Settings で Next を選択します。Filters ダイアログが開きます。
  2. 追加したいフィルターオプションを選択します。
  3. Add Filter を選択します。そのフィルターのパラメーターダイアログが開きます。
  4. パラメーターを編集します。各フィルタータイプの情報については、次のいずれかを参照してください:
  • Retaining Columns Filter
  • Adding Columns Filter
  • Dropping Columns Filter
  • Expanding JSON Filter
  • Digesting Filter
  1. オプションで、同じタイプの別のフィルターを追加するには、特定の列フィルターダイアログ内で Add を選択します。
  2. オプションで、別のタイプの別のフィルターを追加するには、リストからフィルターオプションを選択して、同じ手順を繰り返します。
  3. 追加したいフィルターを追加した後、Next を選択します。Data Preview ダイアログが開きます。

Data Preview

インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

  1. Next を選択します。Data Preview ページが開きます。
  2. データをプレビューする場合は、Generate Preview を選択します。
  3. データを確認します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

S3ジョブのデータコネクタが長時間実行される場合はどうすればよいですか?

コネクタジョブが取り込んでいるS3ファイルの数を確認してください。10,000ファイルを超える場合、パフォーマンスが低下します。この問題を軽減するには、次の方法があります:

  • path_prefixオプションを絞り込んで、S3ファイルの数を減らします。
  • min_task_sizeオプションに268,435,456 (256MB)を設定します。

Workflow経由でAWS S3からインポート

S3インポートインテグレーション用のサンプルワークフローファイルがあります。ymlファイルを使用してインポート設定を定義し、td\_load>:ワークフローオペレータを使用して実行できます。TDコンソールのSource機能だけでは使用できない変数定義が、ymlファイルベースの実行では可能です。

サンプルコードはhttps://github.com/treasure-data/treasure-boxes/tree/master/td_load/s3から参照できます。

timezone: UTC

schedule:
  daily>: 02:00:00

sla:
  time: 08:00
  +notice:
    mail>: {data: Treasure Workflow Notification}
    subject: This workflow is taking long time to finish
    to: [me@example.com]

_export:
  td:
    dest_db: dest_db_ganesh
    dest_table: dest_table_ganesh

+prepare_table:
  td_ddl>:
  create_databases: ["${td.dest_db}"]
  create_tables: ["${td.dest_table}"]
  database: ${td.dest_db}

+load:
  td_load>: config/daily_load.yml
  database: ${td.dest_db}
  table: ${td.dest_table}

CLI (Toolbelt)経由でAWS S3からインポート

オプションでTD Toolbeltを使用して、接続の設定、ジョブの作成、実行のスケジュール設定を行うことができます。

CLIを使用してコネクタを設定

コネクタをセットアップする前に、'td'コマンドをインストールしてください。最新のTD Toolbeltをインストールします。

Seed設定ファイル(seed.yml)の作成

以下の例に示すように、AWSアクセスキーとシークレットアクセスキーを使用してseed.ymlを準備します。また、バケット名とソースファイル名(または複数ファイルのプレフィックス)を指定する必要があります。

in:
  type: s3_fme
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  include_file_name: true
  # path to the *.csv file on your s3 bucket
  path_prefix: path/to/sample_file
  path_match_pattern: \.csv$ # a file will be skipped if its path doesn't match with this pattern

  ## some examples of regexp:
  #path_match_pattern: /archive/ # match files in .../archive/... directory
  #path_match_pattern: /data1/|/data2/ # match files in .../data1/... or .../data2/... directory
  #path_match_pattern: .csv$|.csv.gz$ # match files whose suffix is .csv or .csv.gz
out:
  mode: append

Amazon S3用データコネクタは、指定されたプレフィックスに一致するすべてのファイルをインポートします。(例: path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz)

先頭に'/'を付けてpath_prefixを使用すると、意図しない結果になる可能性があります。例えば:「path_prefix: /path/to/sample_file」とすると、プラグインはs3://sample_bucket//path/to/sample_fileでファイルを探しますが、これはS3上の意図したパスs3://sample_bucket/path/to/sample_fileとは異なります。

フィールドの推測 (load.ymlの生成)

connector:guessを使用します。このコマンドは自動的にソースファイルを読み取り、ファイル形式とそのフィールド/カラムを評価(推測ロジックを使用)します。

td connector:guess seed.yml -o load.yml

load.ymlを開くと、ファイル形式、エンコーディング、カラム名、型を含む評価されたファイル形式定義が表示されます。

in:
  type: s3_fme
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file
  include_file_name: true
  path_prefix: path/to/sample_file
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    skip_header_lines: 1
    columns:
    - name: id
      type: long
    - name: company
      type: string
    - name: customer
      type: string
    - name: created_at
      type: timestamp
      format: '%Y-%m-%d %H:%M:%S'
out:
  mode: append

その後、td connector:previewコマンドを使用してデータのプレビューを表示できます。

$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id    | company | customer | created_at          |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. |    David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |      Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. |    Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |     Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+

guessコマンドは、ソースデータのサンプル行を使用してカラム定義を評価するため、ソースデータファイルに3行以上、2カラム以上が必要です。

システムがカラム名またはカラムタイプを予期せず検出した場合は、load.ymlを直接変更して再度プレビューしてください。

現在、データコネクタは「boolean」、「long」、「double」、「string」、「timestamp」タイプの解析をサポートしています。

ロードジョブの実行

ロードジョブを送信します。データサイズによっては、数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを指定します。

Treasure Dataのストレージは時間でパーティション化されているため、--time-columnオプションを指定することを推奨します(データパーティショニングを参照)。オプションが指定されていない場合、データコネクタは最初のlongまたはtimestampカラムをパーティショニング時刻として選択します。—- time-columnで指定するカラムは、longまたはtimestampのいずれかである必要があります。

データに時刻カラムがない場合は、add_timeフィルタオプションを使用して追加できます。詳細については、add_timeフィルタプラグインを参照してください。

td connector:issue load.yml --database td_sample_db --table td_sample_table   --time-column created_at

connector:issueコマンドは、すでにdatabase(td_sample_db)table(td_sample_table)を作成していることを前提としています。データベースまたはテーブルがTDに存在しない場合、このコマンドは成功しないため、データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成します:

td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

データコネクタはサーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。

時刻フィールドがある場合は、--time-columnオプションを指定する必要はありません。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

実行のスケジュール設定

増分ファイルインポートのために、定期的なデータコネクタの実行をスケジュール設定できます。詳細については、こちらを参照してください。

IAMパーミッション

YML設定ファイルで指定され、connector:guessおよびconnector:issueコマンドで使用されるIAM認証情報には、AWS S3リソースにアクセスするためのパーミッションが必要です。IAMユーザーがこれらのパーミッションを持っていない場合は、事前定義されたポリシー定義のいずれかでユーザーを設定するか、JSON形式で新しいポリシー定義を作成します。

以下の例は、ポリシー定義リファレンス形式に基づいており、IAMユーザーにyour-bucketへの読み取り専用(GetObjectおよびListBucketアクションを通じて)パーミッションを付与します:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

your-bucketを実際のバケット名に置き換えてください。

一時認証情報プロバイダとしてのAWS Security Token Service (STS)の使用

場合によっては、access_key_idとsecret_access_keyによるIAM基本認証はリスクが高すぎる可能性があります(ただし、secret_access_keyはジョブの実行時やセッション作成後に明確に表示されることはありません)。

S3データコネクタは、一時セキュリティ認証情報を提供するAWS Secure Token Service (STS)を使用できます。AWS STSを使用すると、任意のIAMユーザーが自分のaccess_key_idとsecret_access_keyを使用して、有効期限付きの一時的なnew_access_key_id、new_secret_access_key、session_tokenキーのセットを作成でき、有効期限後は認証情報が無効になります。 一時セキュリティ認証情報のタイプは次のとおりです:

  • Session Token 最もシンプルなセキュリティ認証情報で、有効期限が関連付けられています。一時認証情報は、それらを生成するために使用された元のIAM認証情報が持っていたすべてのリソースへのアクセスを許可します。これらの認証情報は、有効期限が切れておらず、元のIAM認証情報のパーミッションが変更されていない限り有効です。
  • Federation Token 上記のSession Tokenに追加のパーミッション制御レイヤーを追加します。Federation Tokenを生成する際、IAMユーザーはパーミッションポリシーの定義を指定する必要があります。このスコープを使用して、IAMユーザーがアクセスできるリソースのうち、Federation Tokenの保有者がアクセスできるリソースをさらに絞り込むことができます。任意のパーミッションポリシー定義を使用できますが、パーミッションスコープは、トークンの生成に使用されたIAMユーザーのパーミッションのすべてまたはサブセットに限定されます。Session Tokenと同様に、Federation Token認証情報は、有効期限が切れておらず、元のIAM認証情報に関連付けられたパーミッションが変更されていない限り有効です。

AWS STS一時セキュリティ認証情報は、AWS CLIまたは選択した言語のAWS SDKを使用して生成できます。

Session Token

$ aws sts get-session-token --duration-seconds 900
{
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T05:11:14Z",
        "AccessKeyId": "XXXXXXXXXX"
    }
}

Federation Token

aws sts get-federation-token --name temp_creds --duration-seconds 900   --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'
{
    "FederatedUser": {
        "FederatedUserId": "523683666290:temp_creds",
        "Arn": "arn:aws:sts::523683666290:federated-user/temp_creds"
    },
    "Credentials": {
        "SecretAccessKey": "YYYYYYYYYY",
        "SessionToken": "ZZZZZZZZZZ",
        "Expiration": "2015-12-23T06:06:17Z",
        "AccessKeyId": "XXXXXXXXXX"
    },
    "PackedPolicySize": 16
}

ここで: temp_credはFederatedトークン/ユーザーの名前、bucketnameはアクセスを許可するバケットの名前です。詳細についてはARN仕様を参照してください。s3:GetObjects3:ListBucketは、AWS S3バケットの基本的な読み取り操作です。

AWS STS認証情報は取り消すことができません。有効期限が切れるか、認証情報の生成に使用された元のIAMユーザーのパーミッションを削除または除去するまで有効なままです。

一時セキュリティ認証情報が生成されたら、次のようにseed.ymlファイルにSecretAccessKeyAccessKeyIdSessionTokenをコピーします。

in:
  type: s3_fme
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  include_file_name: true
  session_token: ZZZZZZZZZZ
  bucket: sample_bucket
  path_prefix: path/to/sample_file

そして、通常どおりS3用データコネクタを実行します。

認証情報の有効期限

STS認証情報は指定された時間後に有効期限が切れるため、認証情報を使用するデータコネクタジョブは、認証情報の有効期限が切れると最終的に失敗し始める可能性があります。 STS認証情報の有効期限が切れたと報告された場合、データコネクタジョブは最大回数(5回)まで再試行し、最終的に'error'ステータスで完了します。