# Amazon S3 Parquet Import Integration Amazon S3のインポートデータコネクタにより、S3バケットに保存されているParquetファイルからデータをインポートできます。 ## Amazon S3 Parquetの認証方法について | 認証方法 | Amazon S3 parquet | | --- | --- | | **basic** | **x** | | **session** | **x** | | **assume_role** | **x** | **前提条件** - Treasure Dataに関する基本的な知識。 ## S3 Bucket Policy 設定 TD リージョンと同じリージョンにある AWS S3 バケットを使用している場合、TD がバケットにアクセスする IP アドレスはプライベートで動的に変化します。アクセスを制限したい場合は、静的 IP アドレスではなく VPC の ID を指定してください。例えば、US リージョンの場合は vpc-df7066ba 経由でアクセスを設定し、Tokyo リージョンの場合は vpc-e630c182 経由、EU01 リージョンの場合は vpc-f54e6a9e 経由でアクセスを設定してください。 TD Console にログインする URL から TD Console のリージョンを確認し、URL 内のリージョンのデータコネクターを参照してください。 詳細については、[API ドキュメント](https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/#s3-bucket-policy-configuration-for-export-and-import-integrations)を参照してください。 ## Treasure Data Integration の静的 IP アドレス セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。 リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: [https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/](https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/) ## 制限事項 Treasure Dataでは、最適なパフォーマンスを得るために、個々のparquetファイルのサイズを100 MB以下に制限することを推奨しています。ただし、これは厳密な要件ではなく、ニーズに合わせてパーティションサイズを調整できます。 ## TD Consoleでの新しい接続の作成 TD Consoleを使用してデータコネクタを作成できます。 1. **TD Console**を開きます。 2. **Integrations Hub** > **Catalog**に移動します。 3. **S3 parquet**を検索し、Amazon S3 parquetを選択します。 4. **Create Authentication**を選択します。 New Authenticationダイアログが開きます。選択した認証方法によって、ダイアログは次のような画面になります: **basic** ![](/assets/screen-shot-2023-03-28-at-14.24.47.f5f2bda6b9db5d192cb911d7d4ec0aca1299fe948d93cf2369056288b991aeab.e7823387.png) **session** ![](/assets/screen-shot-2023-03-28-at-14.26.21.c0ba3d29ae2f0819045e766b368619f4ada83cfe886b516e6681b4cd40fe6557.e7823387.png) **assume_role** ![](/assets/screen-shot-2023-03-28-at-14.26.29.f3cd34a591e47b6970307ef270d5c4e0b0dc881229742d5851e7dc8e3e6c1637.e7823387.png) 1. 認証フィールドを設定し、**Continue**を選択します。 | Parameter | Description | | --- | --- | | **Endpoint** | S3サービスエンドポイントのオーバーライド。リージョンとエンドポイント情報は[AWS service endpoints](http://docs.aws.amazon.com/general/latest/gr/rande.md#s3_region)で確認できます。(例: [*s3.ap-northeast-1.amazonaws.com*](https://s3.ap-northeast-1.amazonaws.com/)) 指定すると、リージョン設定を上書きします。 | | **リージョン** | AWSリージョン | | **認証方式** | | | **basic** | - access_key_idとsecret_access_keyを使用して認証します。[AWSプログラマティックアクセス](https://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.md)を参照してください。 - アクセスキーID - シークレットアクセスキー | | **session (推奨)** | - 一時的に生成されたaccess_key_id、secret_access_key、およびsession_tokenを使用します。 - アクセスキーID - シークレットアクセスキー - セッショントークン | | **assume_role** | - ロールアクセスを使用します。[AWS AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.md)を参照してください。 - TDのインスタンスプロファイル - アカウントID - お客様のロール名 - 外部ID - 有効期間(秒) | | **anonymous** | サポートされていません | | **アクセスキーID** | AWS S3が発行 | | **シークレットアクセスキー** | AWS S3が発行 | | **セッショントークン** | お客様の一時的なAWSセッショントークン | | **TDのインスタンスプロファイル** | この値はTDコンソールによって提供されます。値の数値部分は、IAMロールを作成する際に使用するアカウントIDを構成します。 | | **Account ID** | AWS Account ID | | **Your Role Name** | AWS Role 名 | | **External ID** | シークレット External ID | | **Duration** | 一時的な認証情報の有効期間 | 1. 新しい AWS S3 接続に名前を付け、**Done** を選択します。 ## assume_role 認証方式による認証の作成 1. assume_role 認証方式で新しい認証を作成します。 2. TD's Instance Profile フィールドの値の数値部分をメモしておきます。 ![](/assets/4f8559a3-4f70-4c83-b8b9-adaaec64662f_1_201_a.1331328bd097ab94743fc3d5f53d63f8cbc98ad689475b3adc5b21335a2e22e3.e7823387.jpeg) 1. AWS IAM role を作成します。 ![](/assets/ea9e1f37-be45-4ac6-97fa-4b46f65b0388_1_201_a.99af6c7efd4bcd9e50a04d5457e1febf3da2f104b7a090d9bcc307dc3dc8d8c3.e7823387.jpeg) ![](/assets/45c6048c-cc9e-40c1-b0f4-529e356d6e16_1_201_a.bb869c9c9ad66cdecbebc9b2b1231acd5f24c98517c6bc2cfb6ec67efcd884ef.e7823387.jpeg) ## AWS S3 データを Treasure Data に転送する 認証済み接続を作成すると、認証画面が表示されます。 1. 作成した接続を検索します。 2. **New Source** を選択します。 ![](/assets/s3parquetsources.e500d5384ff42a004cc3018561ba7bdd846b87a65042739db5368b3734d70970.e7823387.png) ### Connection 1. Data Transfer フィールドで **Source** の名前を入力します。 2. **Next** をクリックします。 ![](/assets/s3parquetcreatesource.05fa56ff12f8143248d1016d95dd2d9f0f82dd78d66ae53973b53829ee3abff0.e7823387.png) ### Source Table Source ダイアログが開きます。 1. 以下のパラメータを編集します。 ![](/assets/s3parquetcreatesource2.841da89442e5f2253681b4dc42966d496e591379a291cbbd3013108cba6a51c5.e7823387.png) | **Parameters** | **Description** | | --- | --- | | **Bucket** | S3バケット名(例:*your_bucket_name*) | | **Path Prefix** | ターゲットキーのプレフィックス(例:*logs/data_*) | | **Path Regex** | ファイルパスに一致させる正規表現。ファイルパスが指定されたパターンに一致しない場合、そのファイルはスキップされます。例えば、.csvファイルのみに一致させたい場合、パターン *.csv$* を入力すると、.csvで終わらないファイル名はすべてスキップされます。[正規表現](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions)を参照してください。 | | **Skip Glacier Objects** | Amazon Glacierストレージクラスに保存されているオブジェクトの処理をスキップします。オブジェクトがGlacierストレージクラスに保存されているにもかかわらず、このオプションがチェックされていない場合、例外がスローされます。 | | **Start after path** | 辞書順でこれより長いパスのみがインポートされます。 | | **Sub Folders Are Partitions** | Sparkパーティションからパーティション列を追加します。サブフォルダー名は次の形式である必要があります:partiton_column_name=value | ディレクトリ内のすべてのファイルをスキャンする必要がある場合(トップレベルディレクトリ「/」からなど)があります。このような場合は、CLIを使用してインポートを行う必要があります。 **例** EMRを設定して、次のようにファイル内のデータを含むparquetファイルを作成できます: ``` [your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-29-15.a103fd5a.parquet] [your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-30-15.b2aede4a.parquet] [your_bucket] - [YM=202010] - [E231A697YXWD39.2020-10-31-01.594fa8e6.parquet] ``` この場合、ソーステーブルの設定は次のようになります: - **Bucket**: your_bucket - **Path Prefix**: YM=202010a/ - **Path Regex**: * (必須ではありません) - **Start after path**: YM=202010/E231A697YXWD39.2020-10-29-15.a103fd5a.parquet ### Data Settings 1. **Next**を選択します。 Data Settingsページが開きます。 2. オプションで、データ設定を編集するか、このダイアログページをスキップします。 ![](/assets/screen_shot_2023-03-28_at_15_15_08.fe769b33ed2ca687f6bb9b1fbaf3eecebb307a0ac94812682b23463f1842dc3f.e7823387.png) | **Parameters** | **Description** | | --- | --- | | **Retry Limit** | 最大リトライ回数 | | **Initial Retry Interval in Millis** | 初期リトライ間隔(ミリ秒単位) | | **Max Retry Wait in Millis** | 最大リトライ間隔。初回リトライ後、待機間隔はこの最大値に達するまで2倍になります。 | | **Number of connector threads** | 並列処理可能なファイルハンドルの数 | | **Number of threads for S3 file downloads** | ファイルのブロックをダウンロードするために使用できる接続数 | | **Number of prefetch block** | 使用するプリフェッチブロックの数 | | **Prefetch block size in MB** | プリフェッチブロックサイズ(MB単位で指定) | ### Filters Filters は、S3、FTP、または SFTP コネクターの Create Source または Edit Source インポート設定で使用できます。 Import Integration Filters を使用すると、インポート用の[データ設定の編集](https://docs.treasuredata.com/smart/project-product-documentation/editing-data-settings)を完了した後、インポートされたデータを変更できます。 import integration filters を適用するには: 1. Data Settings で **Next** を選択します。Filters ダイアログが開きます。 2. 追加したいフィルターオプションを選択します。![](/assets/image-20200609-201955.eed6c6da800ba40d1d98b92e767d9a8f7500cad8a9d4079121190b7d34c23294.c7246827.png) 3. **Add Filter** を選択します。そのフィルターのパラメーターダイアログが開きます。 4. パラメーターを編集します。各フィルタータイプの情報については、次のいずれかを参照してください: - Retaining Columns Filter - Adding Columns Filter - Dropping Columns Filter - Expanding JSON Filter - Digesting Filter 1. オプションで、同じタイプの別のフィルターを追加するには、特定の列フィルターダイアログ内で **Add** を選択します。 2. オプションで、別のタイプの別のフィルターを追加するには、リストからフィルターオプションを選択して、同じ手順を繰り返します。 3. 追加したいフィルターを追加した後、**Next** を選択します。Data Preview ダイアログが開きます。 ### Data Preview インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。 1. **Next** を選択します。Data Preview ページが開きます。 2. データをプレビューする場合は、**Generate Preview** を選択します。 3. データを確認します。 ### Data Placement データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。 1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。 2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。 3. オプションで、database 名を入力します。 4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。 5. オプションで、table 名を入力します。 6. データをインポートする方法を選択します。 - **Append** (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。 - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。 - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。 7. **Timestamp-based Partition Key** 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。 8. データストレージの **Timezone** を選択します。 9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。 #### 一度だけ実行 1. **Off** を選択します。 2. **Scheduling Timezone** を選択します。 3. **Create & Run Now** を選択します。 #### 定期的に繰り返す 1. **On** を選択します。 2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。 3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。 4. **Scheduling Timezone** を選択します。 5. **Create & Run Now** を選択します。 転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。 ## Data type mapping Parquetファイル形式は、ディスクストレージの効率を最適化するために設計された、プリミティブ型と呼ばれる最小限のデータ型を使用します。一方、論理型は、プリミティブ型がどのように解釈されるべきかを指定することで、型を拡張するために使用されます。インポートされると、これらの型はTDでサポートされているデータ型に変換されます。 | **Parquet data type** | **Type mapping when import to TD** | | --- | --- | | Primitive types | | | boolean | boolean | | boolean | boolean | | int32 | long | | int64 | double | | int96 | string | | float, double | double | | byte_array, fixed_len_byte_array | string | | Logical types | | | byte type, short type, integer type, long type | long | | float type, double type | double | | decimal | string | | array type | string | | map, struct | json | | string type, binary type | string | ## CLI(Toolbelt)を使用したAWS S3 Parquetからのインポート オプションとして、TD Toolbeltを使用して接続の設定、ジョブの作成、およびジョブ実行のスケジュール設定を行うことができます。統合をセットアップする前に、最新バージョンのTD Toolbeltがインストールされていることを確認してください。 ### Seedコンフィグファイル(seed.yml)の作成 以下の例に示すように、AWSアクセスキーを使用して*seed.yml*ファイルを設定します。また、バケット名とソースファイル名を指定する必要があります。オプションで、複数のファイルにマッチさせるためにpath_prefixを指定できます。以下の例では、path_prefix: `path/to/sample_fileは次のファイルにマッチします` - `path/to/sample_201501.parquet` - `path/to/sample_201502.parquet` - `path/to/sample_201505.parquet` - `etc.` 先頭に'/'を付けたpath_prefixを使用すると、意図しない結果になる可能性があります。例:「path_prefix: /path/to/sample_file」とすると、プラグインはs3://sample_bucket//path/to/sample_fileのファイルを探すことになり、これはS3上でs3://sample_bucket/path/to/sample_fileという意図したパスとは異なります。 ```yaml in: type: s3_parquet access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket # s3バケット上の*.parquetファイルへのパス path_prefix: path/to/sample_file path_match_pattern: \.parquet$ # このパターンにマッチしないファイルはスキップされます ## 正規表現の例: #path_match_pattern: /archive/ # .../archive/...ディレクトリ内のファイルにマッチ #path_match_pattern: /data1/|/data2/ # .../data1/...または.../data2/...ディレクトリ内のファイルにマッチ out: mode: append ``` 既存の認証を再利用する場合は、**td_authentication_id** コンフィグキーの値にAuthentication IDを設定します。これはassume-role認証方式に必要です。[既存の認証の再利用](/ja/int/amazon-s3-parquet-import-integration#h1__756878989)を参照してください。 ``` in: type: s3_qarquet td_authentication: xxxx bucket: sample_bucket path_prefix: path/to/sample_file out: mode: append ``` ### フィールドの推測(load.ymlの生成) *connector:guess* は自動的にソースファイルを読み取り、ファイル形式、フィールド、カラムを評価します。 ```bash td connector:guess seed.yml -o load.yml ``` load.yml ファイルを確認すると、ファイル形式、エンコーディング、カラム名、型を含む「推測された」ファイル形式定義を確認できます。 ```yaml in: type: s3_parquet access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY bucket: sample_bucket path_prefix: path/to/sample_file out: mode: append ``` ### ロードジョブの実行 ロードジョブを送信します。データのサイズによっては数時間かかる場合があります。データを保存する Treasure Data のデータベースとテーブルを指定します。 Treasure Data のストレージは時間によってパーティション分割されているため、*--time-column* オプションを指定することを推奨します([データパーティショニング](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data) を参照)。このオプションが指定されていない場合、データコネクタは最初の *long* または *timestamp* カラムをパーティショニング時間として選択します。*--time-column* で指定するカラムの型は、*long* または *timestamp* のいずれかである必要があります。 データに時間カラムがない場合は、*add_time* フィルタオプションを使用して時間カラムを追加できます。詳細については、[add_time フィルタ機能](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function) を参照してください。 ```bash td connector:issue load.yml --database td_sample_db --table td_sample_table \ --time-column created_at ``` 以下の例では、connector:issue コマンドは、*database(td_sample_db)* と *table(td_sample_table)* がすでに作成されていることを前提としています。Treasure Data にデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、*td connector:issue* コマンドで *--auto-create-table* オプションを使用してデータベースとテーブルを自動作成してください: ```bash td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` データコネクタはサーバー側でレコードをソートしません。時間ベースのパーティショニングを効果的に使用するには、事前にファイル内のレコードをソートしてください。 *time* というフィールドがある場合は、*--time-column* オプションを指定する必要はありません。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table ``` ### IAM 権限の設定 YML 設定ファイルで指定された IAM 認証情報は、*connector:guess* および *connector:issue* コマンドで使用され、アクセスする必要のある AWS S3 リソースに対する権限が必要です。IAM ユーザーがこれらの権限を持っていない場合は、定義済みのポリシー定義のいずれかでユーザーを設定するか、JSON 形式で新しいポリシー定義を作成してください。 以下の例は、[ポリシー定義リファレンス形式](http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.md) に基づいています。これにより、IAM ユーザーに「your-bucket」に対する *読み取り専用* 権限(GetObject および ListBucket アクションを通じて)が付与されます。 ```json { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::your-bucket", "arn:aws:s3:::your-bucket/*" ] } ] } ``` "`your-bucket`" を実際のS3バケット名に置き換えてください。 ### 一時的な認証情報プロバイダーとしてAWS Security Token Service (STS) を使用する 特定のケースでは、access_key_idとsecret_access_keyを通じたIAM基本認証はリスクが高すぎる可能性があります(ジョブの実行時やセッション作成後にsecret_access_keyが明確に表示されることはありませんが)。 S3データコネクタは、AWS Secure Token Service (STS) を使用して[一時的なセキュリティ認証情報](http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.md)を提供できます。AWS STSを使用すると、任意のIAMユーザーが自身のaccess_key_idとsecret_access_keyを使用して、特定の有効期限を持つこれらの一時キーを作成できます: - new_access_key_id - new_secret_access_key - session_tokenキー 以下は一時的なセキュリティ認証情報のタイプです: - [**Session Token**](http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_control-access_getsessiontoken.md) 指定された有効期限を持つ最もシンプルなセキュリティ認証情報です。一時的な認証情報は、それらを生成したIAMユーザーと同じアクセス権を持ちます。これらの認証情報は、有効期限が切れておらず、元のIAMユーザーの権限が変更されていない限り有効です。 - [**Federation Token**](http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_control-access_getfederationtoken.md) これは上記のSession Tokenに対して追加の権限制御レイヤーを追加します。Federation Tokenを生成する際、IAMユーザーはPermission Policy定義を指定する必要があります。スコープは、Federation Tokenの保有者がアクセスできるリソースを制限するために使用できます(権限を付与するIAMユーザーのアクセス権より少ない場合があります)。任意の[Permission Policy](http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.md)定義を使用できますが、権限のスコープは、トークンを生成したIAMの権限と同じか、そのサブセットに制限されます。Session Tokenと同様に、Federation Token認証情報は、有効期限が切れておらず、元のIAM認証情報に関連する権限が変更されていない限り有効です。 AWS STS一時的なセキュリティ認証情報は、[AWS CLI](https://aws.amazon.com/cli/)またはお好みの言語の[AWS SDK](https://aws.amazon.com/tools/)を使用して生成できます。 #### Session Token ```bash $ aws sts get-session-token --duration-seconds 900 ``` #### Federation Token この例では、 - `temp_creds` はFederated tokenまたはユーザーの一時認証情報の名前です。 - `bucketname` はアクセスが許可されているS3バケットの名前です(詳細については[ARN仕様](http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.md#arn-syntax-s3)を参照してください)。 - `s3:GetObject` と `s3:ListBucket` はAWS S3バケットの基本的な読み取り操作です。 ```bash $ aws sts get-federation-token --name temp_creds --duration-seconds 900 \ --policy '{"Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": "arn:aws:s3:::bucketname"}]}' ``` AWS STS認証情報は取り消すことができません。有効期限が切れるまで、または認証情報の生成に使用された元のIAMユーザーの権限を削除または取り消すまで、有効なままです。 一時的なセキュリティ認証情報が生成されたら、*seed.yml* ファイルに `SecretAccessKey`、`AccessKeyId`、および `SessionToken` を含めて、通常どおりData Connector for S3を実行します。 ```yaml in: type: s3_parquet auth_method: session access_key_id: XXXXXXXXXX secret_access_key: YYYYYYYYYY session_token: ZZZZZZZZZZ bucket: sample_bucket path_prefix: path/to/sample_file ``` #### 認証情報の有効期限 STS認証情報は指定された時間が経過すると有効期限が切れるため、その認証情報を使用するデータコネクタジョブは最終的に失敗し始める可能性があります。現在、STS認証情報の有効期限が切れたと報告された場合、データコネクタジョブは最大回数(5回)まで再試行し、最終的に「error」のステータスで完了します。 インポートを確認するには、[データコネクタジョブの検証](/ja/int/amazon-s3-parquet-import-integration#h1__1293994107)の手順を参照してください。 ## 関連トピック - 定期的な実行については、[TD Toolbeltを使用したスケジューリング](/ja/int/scheduling-using-td-toolbelt)を参照してください - 作成したSourceをワークフローからトリガーするには、[IntegrationでTD Workflowを使用する](/ja/int/using-td-workflow-with-td-integrations)を参照してください ### S3ジョブのデータコネクタが長時間実行されている場合はどうすればよいですか? コネクタジョブが取り込んでいるS3ファイルの数を確認してください。10,000ファイルを超える場合、パフォーマンスが低下します。この問題を軽減するには、次の方法があります: - path_prefixオプションを絞り込んで、S3ファイルの数を減らす - min_task_sizeオプションを268,435,456(256MB)に設定する ### 従うべきベストプラクティス - 1ファイルあたり1つのrow_groupではなく、多くのrow_groupsを持つファイルを提供する - 1か所に多数のファイルがある場合は、パラメータPath PrefixとPath Regexを使用して複数のジョブに分割してください。 各ジョブは20〜40ファイルにする必要があります。 例: ``` path_prefix: folder/sub_folderpath_match_pattern: folder/sub_folder/regrex ```