Skip to content
Last updated

Amazon S3 Parquet Export Integration

Parquetは、カラム型データフォーマットであり、クエリがデータセット内のカラムのサブセットのみにアクセスする必要がある分析ワークロードに特に有益です。このため、多くのデータパイプライン戦略において好まれるオプションとなっています。

Amazon S3 Parquet Export Integrationを使用すると、Treasure Dataのジョブ結果からParquetファイルを生成し、Amazon S3に直接アップロードできます。

前提条件

  • Treasure Dataの基本的な知識

S3 Bucket Policy 設定

TD リージョンと同じリージョンにある AWS S3 バケットを使用している場合、TD がバケットにアクセスする IP アドレスはプライベートで動的に変化します。アクセスを制限したい場合は、静的 IP アドレスではなく VPC の ID を指定してください。例えば、US リージョンの場合は vpc-df7066ba 経由でアクセスを設定し、Tokyo リージョンの場合は vpc-e630c182 経由、EU01 リージョンの場合は vpc-f54e6a9e 経由でアクセスを設定してください。

TD Console にログインする URL から TD Console のリージョンを確認し、URL 内のリージョンのデータコネクターを参照してください。

詳細については、API ドキュメントを参照してください。

Treasure Data Integration の静的 IP アドレス

セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。

リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/

要件と制限事項

  • 最大512 MBの行グループサイズのみをサポートします。
  • ファイルのアップロードにマルチパートアップロードが使用されるため、何らかの理由でS3コンテナにアップロードパーツが残る場合があります。不完全なマルチパートアップロードのクリーンアップポリシーを有効にすることをお勧めします。詳細については、S3 Lifecycle Management Update – Support for Multipart Uploads and Delete Markersを参照してください。

TD Consoleで接続を作成する

クエリを実行する前に、データ接続を作成して設定する必要があります。データ接続の一部として、統合にアクセスするための認証を提供します。

Amazon S3 Parquetでサポートされている認証方法

認証方法Amazon S3 Parquet
basicx
sessionx
assume_rolex

認証を作成する

最初のステップは、認証情報のセットを使用して新しい認証を作成することです。

  1. Integrations Hubを選択します。
  2. Catalogを選択します。
  3. Catalogで統合を検索し、アイコンにマウスカーソルを合わせてCreate Authenticationを選択します。
  4. Credentialsタブが選択されていることを確認し、統合の認証情報を入力します。 認証フィールド
ParameterDescription
EndpointS3サービスエンドポイントのオーバーライド。リージョンとエンドポイント情報は、AWS service endpointsで確認できます(例:s3.ap-northeast-1.amazonaws.com)。指定すると、リージョン設定が上書きされます。
RegionAWSリージョン
Authentication Method
basic
  • access_key_idとsecret_access_keyを使用して認証します。AWS Programmatic accessを参照してください。
  • Access Key ID
  • Secret access key
session (推奨)
  • 一時的に生成されたaccess_key_id、secret_access_key、およびsession_tokenを使用します。
  • Access Key ID
  • Secret access key
  • Session token
assume_role
  • ロールアクセスを使用します。AWS AssumeRoleを参照
  • TDのインスタンスプロファイル
  • アカウントID
  • ロール名
  • 外部ID
  • 有効期間(秒)
匿名サポートされていません
アクセスキーIDAWS S3から発行
シークレットアクセスキーAWS S3から発行
セッショントークン一時的なAWSセッショントークン
TDのインスタンスプロファイルこの値はTDコンソールによって提供されます。この値の数値部分が、IAMロールを作成する際に使用するアカウントIDを構成します。
アカウントIDAWSアカウントID
ロール名AWSロール名
外部IDシークレット外部ID
有効期間一時的な認証情報の有効期間
  1. 続行を選択します。
  2. 認証の名前を入力し、完了を選択します。

assume_role認証方式を使用した認証の作成

  1. assume_role認証方式で新しい認証を作成します。
  2. TDのインスタンスプロファイルフィールドの値の数値部分をメモします。
  3. AWS IAMロールを作成します。

クエリ結果のエクスポートの設定

クエリの定義

  1. Data Workbench > Queriesに移動します。
  2. 新しいクエリを選択します。
  3. クエリを実行して結果セットを検証します。

結果エクスポート先の指定

  1. 結果をエクスポートを選択します。
  2. 認証を指定し、エクスポートの設定を開始します。 既存の認証を選択するか、出力に使用する外部サービス用の新しい認証を作成できます。次のいずれかを選択します:

既存のIntegrationを使用する

新しいIntegrationを作成する

**

設定フィールド

フィールド説明
Server-side encryptionサポートされる値:
  • sse-s3: Server-side Encryption Mode
  • sse-kms: 新しいSSE Mode
Server-side Encryption Algorithmサポートされる値: - AES256
KMS Key ID対称AWS KMS Key ID。KMS Key IDの入力がない場合、デフォルトのKMS Keyを作成/使用します。
BucketS3バケット名を指定してください (例: your_bucket_name)。
CompressionS3バケット名を指定してください (例: your_bucket_name)。
Paths3ファイル名(オブジェクトキー)を指定し、拡張子を含めてください (例: test.parquet)。
Compressionエクスポートされるファイルの圧縮形式。サポートされる値: - None - Gzip - Snappy
Row group sizeParquetファイルのグループサイズを指定してください
Page sizeParquetファイルのページサイズを指定してください
Timestamp UnitParquetファイルのタイムスタンプ形式を指定してください。サポートされる時間単位:
  • Milliseconds
  • Microseconds
Single fileRow Groupのサイズで複数のファイルに分割する場合はチェックを外してください
Enable Bloom filterParquetファイルのBloom filterを有効にする
リトライ制限最大リトライ回数
リトライ待機時間各リトライ時の待機時間(ミリ秒)
同時実行スレッド数S3へのアップロードを行うS3スレッドの同時実行数
スレッドあたりの接続数S3への各スレッドが開くHTTP接続数
パートサイズS3のマルチパートアップロードのパートサイズ

Audience Studioでアクティベーションを作成する

アクティベーションを作成して、セグメントデータやステージをエクスポートできます。

Audience Studio で Segment をアクティベートする

Audience Studio で activation を作成することで、segment データをターゲットプラットフォームに送信することもできます。

  1. Audience Studio に移動します。
  2. parent segment を選択します。
  3. ターゲット segment を開き、右クリックして、Create Activation を選択します。
  4. Details パネルで、Activation 名を入力し、前述の Configuration Parameters のセクションに従って activation を設定します。
  5. Output Mapping パネルで activation 出力をカスタマイズします。

  • Attribute Columns
    • Export All Columns を選択すると、変更を加えずにすべての列をエクスポートできます。
    • + Add Columns を選択して、エクスポート用の特定の列を追加します。Output Column Name には、Source 列名と同じ名前があらかじめ入力されます。Output Column Name を更新できます。+ Add Columns を選択し続けて、activation 出力用の新しい列を追加します。
  • String Builder
    • + Add string を選択して、エクスポート用の文字列を作成します。次の値から選択します:
      • String: 任意の値を選択します。テキストを使用してカスタム値を作成します。
      • Timestamp: エクスポートの日時。
      • Segment Id: segment ID 番号。
      • Segment Name: segment 名。
      • Audience Id: parent segment 番号。
  1. Schedule を設定します。

  • スケジュールを定義する値を選択し、オプションでメール通知を含めます。
  1. Create を選択します。

batch journey の activation を作成する必要がある場合は、Creating a Batch Journey Activation を参照してください。

(オプション) Query Export ジョブをスケジュールする

Scheduled Jobs と Result Export を使用して、指定したターゲット宛先に出力結果を定期的に書き込むことができます。

Treasure Data のスケジューラー機能は、高可用性を実現するために定期的なクエリ実行をサポートしています。

2 つの仕様が競合するスケジュール仕様を提供する場合、より頻繁に実行するよう要求する仕様が優先され、もう一方のスケジュール仕様は無視されます。

例えば、cron スケジュールが '0 0 1 * 1' の場合、「月の日」の仕様と「週の曜日」が矛盾します。前者の仕様は毎月 1 日の午前 0 時 (00:00) に実行することを要求し、後者の仕様は毎週月曜日の午前 0 時 (00:00) に実行することを要求するためです。後者の仕様が優先されます。

TD Console を使用してジョブをスケジュールする

  1. Data Workbench > Queries に移動します

  2. 新しいクエリを作成するか、既存のクエリを選択します。

  3. Schedule の横にある None を選択します。

  4. ドロップダウンで、次のスケジュールオプションのいずれかを選択します:

    ドロップダウン値説明
    Custom cron...Custom cron... の詳細を参照してください。
    @daily (midnight)指定されたタイムゾーンで 1 日 1 回午前 0 時 (00:00 am) に実行します。
    @hourly (:00)毎時 00 分に実行します。
    Noneスケジュールなし。

Custom cron... の詳細

Cron 値説明
0 * * * *1 時間に 1 回実行します。
0 0 * * *1 日 1 回午前 0 時に実行します。
0 0 1 * *毎月 1 日の午前 0 時に 1 回実行します。
""スケジュールされた実行時刻のないジョブを作成します。
 *    *    *    *    *
 -    -    -    -    -
 |    |    |    |    |
 |    |    |    |    +----- day of week (0 - 6) (Sunday=0)
 |    |    |    +---------- month (1 - 12)
 |    |    +--------------- day of month (1 - 31)
 |    +-------------------- hour (0 - 23)
 +------------------------- min (0 - 59)

次の名前付きエントリを使用できます:

  • Day of Week: sun, mon, tue, wed, thu, fri, sat.
  • Month: jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec.

各フィールド間には単一のスペースが必要です。各フィールドの値は、次のもので構成できます:

フィールド値 例の説明
各フィールドに対して上記で表示された制限内の単一の値。
フィールドに基づく制限がないことを示すワイルドカード '*''0 0 1 * *'毎月 1 日の午前 0 時 (00:00) に実行するようにスケジュールを設定します。
範囲 '2-5' フィールドの許可される値の範囲を示します。'0 0 1-10 * *'毎月 1 日から 10 日までの午前 0 時 (00:00) に実行するようにスケジュールを設定します。
カンマ区切りの値のリスト '2,3,4,5' フィールドの許可される値のリストを示します。0 0 1,11,21 * *'毎月 1 日、11 日、21 日の午前 0 時 (00:00) に実行するようにスケジュールを設定します。
周期性インジケータ '*/5' フィールドの有効な値の範囲に基づいて、 スケジュールが実行を許可される頻度を表現します。'30 */2 1 * *'毎月 1 日、00:30 から 2 時間ごとに実行するようにスケジュールを設定します。 '0 0 */5 * *' は、毎月 5 日から 5 日ごとに午前 0 時 (00:00) に実行するようにスケジュールを設定します。
'*' ワイルドカードを除く上記の いずれかのカンマ区切りリストもサポートされています '2,*/5,8-10''0 0 5,*/10,25 * *'毎月 5 日、10 日、20 日、25 日の午前 0 時 (00:00) に実行するようにスケジュールを設定します。
  1. (オプション) Delay execution を有効にすることで、クエリの開始時刻を遅延させることができます。

(オプション) Workflowでエクスポート結果を設定する

Treasure Workflow内で、この連携を使用してデータをエクスポートするように指定できます。

_export:
  td:
  database: td.database

+a_s3_parquet_export_task:
  td>: export_test.sql
  database: ${td.database}
  result_connection: s3_parquet_conn
  result_settings:
  type:

(オプション) CLIを使用したエクスポート連携

単一のクエリ結果をS3バケットに出力するには、td queryコマンドに*--result*オプションを追加します。ジョブが完了すると、結果がS3バケットに書き込まれます。td queryコマンドについては、こちらの記事を参照してください。

以下のように、--resultパラメータを使用してS3へのエクスポートの詳細設定を指定できます:

{
  "type": "s3_parquet",
  "endpoint": "",
  "region": "us-east-1",
  "auth_method": "",
  "session_token": "",
  "account_id": "",
  "role_name": "",
  "duration_in_seconds": 3600,
  "access_key_id": "xxxxx",
  "secret_access_key": "xxxxx",
  "sse_type": "sse-s3",
  "sse_algorithm": "AES256",
  "kms_key_id": "arn:aws:kms:us-east-1:xxx:key/xxx",
  "bucket": "bucket",
  "path": "file.parquet",
  "row_group_size": 256,
  "page_size": 1024,
  "timestamp_unit": "milliseconds",
  "enable_bloom_filter": false,
  "enable_single_file": true,
  "compression": "gzip",
  "part_size": 100,
  "part_size": 100,
  "retry_limit": 7,
  "retry_wait_millis": 500,
  "number_of_concurrent_threads": 4,
  "connections_per_thread": 128,
}

パラメータ

名前説明デフォルト値必須
typeエクスポート先サービスの名前を記述します。s3_parquetN/AYes
endpointS3 サービスエンドポイントのオーバーライド。AWS サービスエンドポイントでリージョンとエンドポイント情報を確認できます。(例: s3.ap-northeast-1.amazonaws.com)S3 サービスエンドポイントのオーバーライドN/ANo
regionAWS リージョンAWS リージョンus-east-1No
auth_methodS3 の認証方式basic session assume_rolebasicYes
access_key_idアクセスキー IDキー IDN/Aauth_method として basic または session を使用する場合
secret_access_keyシークレットアクセスキーシークレットアクセスキーN/Aauth_method として basic または session を使用する場合
session_tokenセッショントークンセッショントークンN/Aauth_method として session を使用する場合
account_idアカウント IDアカウント IDN/Aauth_method として assume_role を使用する場合
role_nameロール名ロール名N/Aauth_method として assume_role を使用する場合
external_id外部 ID外部 IDN/Aauth_method として assume_role を使用する場合
duration_in_seconds接続の持続時間秒単位の持続時間3600No
bucketS3 バケット値S3 バケット値N/AYes
pathファイル形式を含む S3 パス (例: "/path/file.parquet")パスファイルN/AYes
row_group_sizeParquet 行グループサイズParquet 行グループサイズ256No
page_sizeParquet ページサイズParquet ページサイズ1024No
timestamp_unitParquet タイムスタンプ形式- milliseconds - microsecondsmillisecondsNo
enable_bloom_filterParquet ファイルのブルームフィルタを有効化- true - falsefalseNo
enable_single_file単一ファイルへのエクスポート、または各ファイルに1つの row_group_file を含む複数ファイルへのエクスポートを有効化- true - falsetrueY
compressionエクスポートされたファイルの圧縮形式- None - Gzip - SnappyNoneNo
retry_limit最大リトライ回数リトライ制限7No
retry_wait_millis各リトライ時の待機時間(ミリ秒)リトライ待機時間(ミリ秒)500No
number_of_concurrent_threadsS3 へのアップロードのための同時スレッド数同時 S3 スレッド数4No
connection_per_threadS3 への各スレッドごとの HTTP 接続数スレッドごとの HTTP 接続数16No
part_sizeS3 のマルチパートアップロードのパートサイズパートサイズ100No

使用例

$ td query \
--result '{"type":"s3_parquet","auth_method":"basic","access_key_id":"access_key_id","secret_access_key":"secret_access_key","bucket":"bucket","path":"/path/file.parquet"}' \
-d sample_datasets "select * from www_access" -T presto