Skip to content
Last updated

Amazon S3 Parquet Import Integration

Amazon S3のインポートデータコネクタにより、S3バケットに保存されているParquetファイルからデータをインポートできます。

Amazon S3 Parquetの認証方法について

認証方法Amazon S3 parquet
basicx
sessionx
assume_rolex

前提条件

  • Treasure Dataに関する基本的な知識。

S3 Bucket Policy 設定

TD リージョンと同じリージョンにある AWS S3 バケットを使用している場合、TD がバケットにアクセスする IP アドレスはプライベートで動的に変化します。アクセスを制限したい場合は、静的 IP アドレスではなく VPC の ID を指定してください。例えば、US リージョンの場合は vpc-df7066ba 経由でアクセスを設定し、Tokyo リージョンの場合は vpc-e630c182 経由、EU01 リージョンの場合は vpc-f54e6a9e 経由でアクセスを設定してください。

TD Console にログインする URL から TD Console のリージョンを確認し、URL 内のリージョンのデータコネクターを参照してください。

詳細については、API ドキュメントを参照してください。

Treasure Data Integration の静的 IP アドレス

セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。

リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/

制限事項

Treasure Dataでは、最適なパフォーマンスを得るために、個々のparquetファイルのサイズを100 MB以下に制限することを推奨しています。ただし、これは厳密な要件ではなく、ニーズに合わせてパーティションサイズを調整できます。

TD Consoleでの新しい接続の作成

TD Consoleを使用してデータコネクタを作成できます。

  1. TD Consoleを開きます。
  2. Integrations Hub > Catalogに移動します。
  3. S3 parquetを検索し、Amazon S3 parquetを選択します。
  4. Create Authenticationを選択します。

New Authenticationダイアログが開きます。選択した認証方法によって、ダイアログは次のような画面になります:

basic

session

assume_role

  1. 認証フィールドを設定し、Continueを選択します。
ParameterDescription
EndpointS3サービスエンドポイントのオーバーライド。リージョンとエンドポイント情報はAWS service endpointsで確認できます。(例: s3.ap-northeast-1.amazonaws.com) 指定すると、リージョン設定を上書きします。
リージョンAWSリージョン
認証方式
basic
session (推奨)
  • 一時的に生成されたaccess_key_id、secret_access_key、およびsession_tokenを使用します。
  • アクセスキーID
  • シークレットアクセスキー
  • セッショントークン
assume_role
  • ロールアクセスを使用します。AWS AssumeRoleを参照してください。
  • TDのインスタンスプロファイル
  • アカウントID
  • お客様のロール名
  • 外部ID
  • 有効期間(秒)
anonymousサポートされていません
アクセスキーIDAWS S3が発行
シークレットアクセスキーAWS S3が発行
セッショントークンお客様の一時的なAWSセッショントークン
TDのインスタンスプロファイルこの値はTDコンソールによって提供されます。値の数値部分は、IAMロールを作成する際に使用するアカウントIDを構成します。
Account IDAWS Account ID
Your Role NameAWS Role 名
External IDシークレット External ID
Duration一時的な認証情報の有効期間
  1. 新しい AWS S3 接続に名前を付け、Done を選択します。

assume_role 認証方式による認証の作成

  1. assume_role 認証方式で新しい認証を作成します。
  2. TD's Instance Profile フィールドの値の数値部分をメモしておきます。

  1. AWS IAM role を作成します。

AWS S3 データを Treasure Data に転送する

認証済み接続を作成すると、認証画面が表示されます。

  1. 作成した接続を検索します。
  2. New Source を選択します。

Connection

  1. Data Transfer フィールドで Source の名前を入力します。
  2. Next をクリックします。

Source Table

Source ダイアログが開きます。

  1. 以下のパラメータを編集します。

ParametersDescription
BucketS3バケット名(例:your_bucket_name
Path Prefixターゲットキーのプレフィックス(例:logs/data_
Path Regexファイルパスに一致させる正規表現。ファイルパスが指定されたパターンに一致しない場合、そのファイルはスキップされます。例えば、.csvファイルのみに一致させたい場合、パターン .csv$ を入力すると、.csvで終わらないファイル名はすべてスキップされます。正規表現を参照してください。
Skip Glacier ObjectsAmazon Glacierストレージクラスに保存されているオブジェクトの処理をスキップします。オブジェクトがGlacierストレージクラスに保存されているにもかかわらず、このオプションがチェックされていない場合、例外がスローされます。
Start after path辞書順でこれより長いパスのみがインポートされます。
Sub Folders Are PartitionsSparkパーティションからパーティション列を追加します。サブフォルダー名は次の形式である必要があります:partiton_column_name=value

ディレクトリ内のすべてのファイルをスキャンする必要がある場合(トップレベルディレクトリ「/」からなど)があります。このような場合は、CLIを使用してインポートを行う必要があります。

EMRを設定して、次のようにファイル内のデータを含むparquetファイルを作成できます:

[your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-29-15.a103fd5a.parquet]
[your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-30-15.b2aede4a.parquet]
[your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-31-01.594fa8e6.parquet]

この場合、ソーステーブルの設定は次のようになります:

  • Bucket: your_bucket
  • Path Prefix: YM=202010a/
  • Path Regex: * (必須ではありません)
  • Start after path: YM=202010/E231A697YXWD39.2020-10-29-15.a103fd5a.parquet

Data Settings

  1. Nextを選択します。

    Data Settingsページが開きます。

  2. オプションで、データ設定を編集するか、このダイアログページをスキップします。

ParametersDescription
Retry Limit最大リトライ回数
Initial Retry Interval in Millis初期リトライ間隔(ミリ秒単位)
Max Retry Wait in Millis最大リトライ間隔。初回リトライ後、待機間隔はこの最大値に達するまで2倍になります。
Number of connector threads並列処理可能なファイルハンドルの数
Number of threads for S3 file downloadsファイルのブロックをダウンロードするために使用できる接続数
Number of prefetch block使用するプリフェッチブロックの数
Prefetch block size in MBプリフェッチブロックサイズ(MB単位で指定)

Filters

Filters は、S3、FTP、または SFTP コネクターの Create Source または Edit Source インポート設定で使用できます。

Import Integration Filters を使用すると、インポート用のデータ設定の編集を完了した後、インポートされたデータを変更できます。

import integration filters を適用するには:

  1. Data Settings で Next を選択します。Filters ダイアログが開きます。
  2. 追加したいフィルターオプションを選択します。
  3. Add Filter を選択します。そのフィルターのパラメーターダイアログが開きます。
  4. パラメーターを編集します。各フィルタータイプの情報については、次のいずれかを参照してください:
  • Retaining Columns Filter
  • Adding Columns Filter
  • Dropping Columns Filter
  • Expanding JSON Filter
  • Digesting Filter
  1. オプションで、同じタイプの別のフィルターを追加するには、特定の列フィルターダイアログ内で Add を選択します。
  2. オプションで、別のタイプの別のフィルターを追加するには、リストからフィルターオプションを選択して、同じ手順を繰り返します。
  3. 追加したいフィルターを追加した後、Next を選択します。Data Preview ダイアログが開きます。

Data Preview

インポートを実行する前に、Generate Preview を選択してデータのプレビューを表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

  1. Next を選択します。Data Preview ページが開きます。
  2. データをプレビューする場合は、Generate Preview を選択します。
  3. データを確認します。

Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

  1. Next を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。

  2. Database を選択 > Select an existing または Create New Database を選択します。

  3. オプションで、database 名を入力します。

  4. Table を選択 > Select an existing または Create New Table を選択します。

  5. オプションで、table 名を入力します。

  6. データをインポートする方法を選択します。

    • Append (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。
    • Always Replace - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
    • Replace on New Data - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
  7. Timestamp-based Partition Key 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。

  8. データストレージの Timezone を選択します。

  9. Schedule の下で、このクエリを実行するタイミングと頻度を選択できます。

一度だけ実行

  1. Off を選択します。
  2. Scheduling Timezone を選択します。
  3. Create & Run Now を選択します。

定期的に繰り返す

  1. On を選択します。
  2. Schedule を選択します。UI では、@hourly@daily@monthly、またはカスタム cron の 4 つのオプションが提供されます。
  3. Delay Transfer を選択して、実行時間の遅延を追加することもできます。
  4. Scheduling Timezone を選択します。
  5. Create & Run Now を選択します。

転送が実行された後、Data Workbench > Databases で転送の結果を確認できます。

Data type mapping

Parquetファイル形式は、ディスクストレージの効率を最適化するために設計された、プリミティブ型と呼ばれる最小限のデータ型を使用します。一方、論理型は、プリミティブ型がどのように解釈されるべきかを指定することで、型を拡張するために使用されます。インポートされると、これらの型はTDでサポートされているデータ型に変換されます。

Parquet data typeType mapping when import to TD
Primitive types
booleanboolean
booleanboolean
int32long
int64double
int96string
float, doubledouble
byte_array, fixed_len_byte_arraystring
Logical types
byte type, short type, integer type, long typelong
float type, double typedouble
decimalstring
array typestring
map, structjson
string type, binary typestring

CLI(Toolbelt)を使用したAWS S3 Parquetからのインポート

オプションとして、TD Toolbeltを使用して接続の設定、ジョブの作成、およびジョブ実行のスケジュール設定を行うことができます。統合をセットアップする前に、最新バージョンのTD Toolbeltがインストールされていることを確認してください。

Seedコンフィグファイル(seed.yml)の作成

以下の例に示すように、AWSアクセスキーを使用してseed.ymlファイルを設定します。また、バケット名とソースファイル名を指定する必要があります。オプションで、複数のファイルにマッチさせるためにpath_prefixを指定できます。以下の例では、path_prefix: path/to/sample_fileは次のファイルにマッチします

  • path/to/sample_201501.parquet
  • path/to/sample_201502.parquet
  • path/to/sample_201505.parquet
  • etc.

先頭に'/'を付けたpath_prefixを使用すると、意図しない結果になる可能性があります。例:「path_prefix: /path/to/sample_file」とすると、プラグインはs3://sample_bucket//path/to/sample_fileのファイルを探すことになり、これはS3上でs3://sample_bucket/path/to/sample_fileという意図したパスとは異なります。

in:
  type: s3_parquet
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  # s3バケット上の*.parquetファイルへのパス
  path_prefix: path/to/sample_file
  path_match_pattern: \.parquet$ # このパターンにマッチしないファイルはスキップされます

  ## 正規表現の例:
  #path_match_pattern: /archive/ # .../archive/...ディレクトリ内のファイルにマッチ
  #path_match_pattern: /data1/|/data2/ # .../data1/...または.../data2/...ディレクトリ内のファイルにマッチ
out:
  mode: append

既存の認証を再利用する場合は、td_authentication_id コンフィグキーの値にAuthentication IDを設定します。これはassume-role認証方式に必要です。既存の認証の再利用を参照してください。

in:
  type: s3_qarquet
  td_authentication: xxxx
  bucket: sample_bucket
  path_prefix: path/to/sample_file

out:
  mode: append

フィールドの推測(load.ymlの生成)

connector:guess は自動的にソースファイルを読み取り、ファイル形式、フィールド、カラムを評価します。

td connector:guess seed.yml -o load.yml

load.yml ファイルを確認すると、ファイル形式、エンコーディング、カラム名、型を含む「推測された」ファイル形式定義を確認できます。

in:
  type: s3_parquet
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  bucket: sample_bucket
  path_prefix: path/to/sample_file

out:
  mode: append

ロードジョブの実行

ロードジョブを送信します。データのサイズによっては数時間かかる場合があります。データを保存する Treasure Data のデータベースとテーブルを指定します。

Treasure Data のストレージは時間によってパーティション分割されているため、--time-column オプションを指定することを推奨します(データパーティショニング を参照)。このオプションが指定されていない場合、データコネクタは最初の long または timestamp カラムをパーティショニング時間として選択します。--time-column で指定するカラムの型は、long または timestamp のいずれかである必要があります。

データに時間カラムがない場合は、add_time フィルタオプションを使用して時間カラムを追加できます。詳細については、add_time フィルタ機能 を参照してください。

td connector:issue load.yml --database td_sample_db --table td_sample_table \
--time-column created_at

以下の例では、connector:issue コマンドは、database(td_sample_db)table(td_sample_table) がすでに作成されていることを前提としています。Treasure Data にデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issue コマンドで --auto-create-table オプションを使用してデータベースとテーブルを自動作成してください:

td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

データコネクタはサーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。

time というフィールドがある場合は、--time-column オプションを指定する必要はありません。

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

IAM 権限の設定

YML 設定ファイルで指定された IAM 認証情報は、connector:guess および connector:issue コマンドで使用され、アクセスする必要のある AWS S3 リソースに対する権限が必要です。IAM ユーザーがこれらの権限を持っていない場合は、定義済みのポリシー定義のいずれかでユーザーを設定するか、JSON 形式で新しいポリシー定義を作成してください。

以下の例は、ポリシー定義リファレンス形式 に基づいています。これにより、IAM ユーザーに「your-bucket」に対する 読み取り専用 権限(GetObject および ListBucket アクションを通じて)が付与されます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

"your-bucket" を実際のS3バケット名に置き換えてください。

一時的な認証情報プロバイダーとしてAWS Security Token Service (STS) を使用する

特定のケースでは、access_key_idとsecret_access_keyを通じたIAM基本認証はリスクが高すぎる可能性があります(ジョブの実行時やセッション作成後にsecret_access_keyが明確に表示されることはありませんが)。

S3データコネクタは、AWS Secure Token Service (STS) を使用して一時的なセキュリティ認証情報を提供できます。AWS STSを使用すると、任意のIAMユーザーが自身のaccess_key_idとsecret_access_keyを使用して、特定の有効期限を持つこれらの一時キーを作成できます:

  • new_access_key_id
  • new_secret_access_key
  • session_tokenキー

以下は一時的なセキュリティ認証情報のタイプです:

  • Session Token

    指定された有効期限を持つ最もシンプルなセキュリティ認証情報です。一時的な認証情報は、それらを生成したIAMユーザーと同じアクセス権を持ちます。これらの認証情報は、有効期限が切れておらず、元のIAMユーザーの権限が変更されていない限り有効です。

  • Federation Token

    これは上記のSession Tokenに対して追加の権限制御レイヤーを追加します。Federation Tokenを生成する際、IAMユーザーはPermission Policy定義を指定する必要があります。スコープは、Federation Tokenの保有者がアクセスできるリソースを制限するために使用できます(権限を付与するIAMユーザーのアクセス権より少ない場合があります)。任意のPermission Policy定義を使用できますが、権限のスコープは、トークンを生成したIAMの権限と同じか、そのサブセットに制限されます。Session Tokenと同様に、Federation Token認証情報は、有効期限が切れておらず、元のIAM認証情報に関連する権限が変更されていない限り有効です。

AWS STS一時的なセキュリティ認証情報は、AWS CLIまたはお好みの言語のAWS SDKを使用して生成できます。

Session Token

$ aws sts get-session-token --duration-seconds 900

Federation Token

この例では、

  • temp_creds はFederated tokenまたはユーザーの一時認証情報の名前です。
  • bucketname はアクセスが許可されているS3バケットの名前です(詳細についてはARN仕様を参照してください)。
  • s3:GetObjects3:ListBucket はAWS S3バケットの基本的な読み取り操作です。
$ aws sts get-federation-token --name temp_creds --duration-seconds 900 \
  --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}'

AWS STS認証情報は取り消すことができません。有効期限が切れるまで、または認証情報の生成に使用された元のIAMユーザーの権限を削除または取り消すまで、有効なままです。

一時的なセキュリティ認証情報が生成されたら、seed.yml ファイルに SecretAccessKeyAccessKeyId、および SessionToken を含めて、通常どおりData Connector for S3を実行します。

in:
  type: s3_parquet
  auth_method: session
  access_key_id: XXXXXXXXXX
  secret_access_key: YYYYYYYYYY
  session_token: ZZZZZZZZZZ
  bucket: sample_bucket
  path_prefix: path/to/sample_file

認証情報の有効期限

STS認証情報は指定された時間が経過すると有効期限が切れるため、その認証情報を使用するデータコネクタジョブは最終的に失敗し始める可能性があります。現在、STS認証情報の有効期限が切れたと報告された場合、データコネクタジョブは最大回数(5回)まで再試行し、最終的に「error」のステータスで完了します。

インポートを確認するには、データコネクタジョブの検証の手順を参照してください。

関連トピック

S3ジョブのデータコネクタが長時間実行されている場合はどうすればよいですか?

コネクタジョブが取り込んでいるS3ファイルの数を確認してください。10,000ファイルを超える場合、パフォーマンスが低下します。この問題を軽減するには、次の方法があります:

  • path_prefixオプションを絞り込んで、S3ファイルの数を減らす
  • min_task_sizeオプションを268,435,456(256MB)に設定する

従うべきベストプラクティス

  • 1ファイルあたり1つのrow_groupではなく、多くのrow_groupsを持つファイルを提供する

  • 1か所に多数のファイルがある場合は、パラメータPath PrefixとPath Regexを使用して複数のジョブに分割してください。 各ジョブは20〜40ファイルにする必要があります。

    例:

path_prefix: folder/sub_folderpath_match_pattern: folder/sub_folder/regrex