# PostgreSQLインポート連携 PostgreSQL用データコネクタを使用すると、PostgreSQLデータベースからTreasure Dataにデータを直接インポートできます。PostgreSQLデータのエクスポートについては、[PostgreSQLエクスポート連携](/ja/int/postgresql-export-integration)を参照してください。 ## 前提条件 - Treasure Dataの基本知識 - PostgreSQLの基本知識 - **リモート**で実行されているPostgreSQLインスタンス(例:RDS上) ## Treasure Data Integration の静的 IP アドレス セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。 リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります: [https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/](https://api-docs.treasuredata.com/en/overview/ip-addresses-integrations-result-workers/) ## TD Consoleを使用したPostgreSQLからのインポート ### Authenticationの作成 データ接続を設定する際、連携にアクセスするためのAuthenticationを提供します。Treasure Dataでは、Authenticationを設定してからソース情報を指定します。 1. **TD Console**を開きます。 2. **Integrations Hub** > **Catalog**に移動します。 3. PostgreSQLを検索して選択します。**Create**を選択します。 ![](/assets/image-20200807-223217.e12f01c92947408583fc23ce8b3aa685bc13b3aa4dd17f5289ba97dc2aa5ff53.0a0f313e.png) 4. 次のダイアログが開きます。 ![](/assets/image-20200807-223310.42cc63fd8fcebfbb09119a91a2366037d0bb3c8ac869ffac5f6d77e2aa52d6e8.0a0f313e.png)![](/assets/image-20200807-223337.b2fb9306e1c756327fc3e5a97ae3ada3e132e3006642f4937eec0fd6cd5a9034.0a0f313e.png) 5. 必要なAuthentication情報を入力し、パラメータを設定します。**Continue**を選択します。 | パラメータ | 説明 | | --- | --- | | **Host** | IPアドレスなど、ソースデータベースのホスト情報。 | | **Port** | ソースインスタンスの接続ポート。PostgreSQLのデフォルトは5432です。 | | **User** | ソースデータベースに接続するためのユーザー名。 | | **Password** | ソースデータベースに接続するためのパスワード。 | | **Use SSL** | SSLを使用して接続する場合は、このボックスをチェックします。 | | **Specify SSL version** | 接続に使用するSSLバージョンを選択します。 | | **Socket connection timeout** | ソケット接続のタイムアウト(秒単位)(デフォルトは300)。 | | **Network timeout** | ネットワークソケット操作のタイムアウト(秒単位)。0はタイムアウトなしを意味します。 | 1. 接続の名前を入力します。 2. **Done**を選択します。 ### ソースの作成 Authentication済み接続を作成すると、自動的にAuthenticationsに移動します。 1. 作成した接続を検索します。 2. **New Source**を選択します。Create Sourceダイアログが開きます。 ### **Connection** 1. Data Transferフィールドに**Source**の名前を入力します。 ![](/assets/image-20200807-224143.be148c35b05d7c1f6d5e5e3b2dc090ca71204e9ea769eb48724acae4e5e22edf.0a0f313e.png) 1. **Next**をクリックします。 ### **Source Table** 1. 次のパラメータを編集します ![](/assets/postgresql-import-integration-2024-03-22-3.b0dc52811d5498dffd3736b63c338b7865379687dc59aecd393c1c7286c225fa.0a0f313e.png) | **パラメータ** | **説明** | | --- | --- | | **Driver version** | PostgreSQL JDBCドライバーを選択します | | **Database name** | データを転送するデータベースの名前。例:your_database_name。 | | **Use custom SELECT query?** | 単純なSELECT (columns) FROM table WHERE (condition)以上のものが必要な場合に使用します。 | | **SELECT columns** | データを取得したい特定のカラムがある場合は、ここにリストします。それ以外の場合は、すべてのカラムが転送されます。 | | **Table** | データをインポートするテーブル。 | | **WHERE condition** | テーブルから取得するデータに追加の条件が必要な場合は、WHERE句の一部として指定します。 | | **ORDER BY** | 特定のフィールドでレコードを順序付けする必要がある場合は指定します。 | ### **Data Settings** 1. **Next**を選択します。Data Settingsページが開きます。 2. 必要に応じてデータ設定を編集するか、このページをスキップします。 ![](/assets/image-20200807-230323.c09b390ba74f7c7597deb4080e0aa3d5d43b12238e79df926f38ed27d0af0085.0a0f313e.png) | **パラメータ** | **説明** | | --- | --- | | **Incremental**: | この転送を繰り返し実行する場合は、このチェックボックスを選択して、前回インポートが実行されてからのデータのみをインポートします。 | | **Rows per batch** | 非常に大きなデータセットはメモリの問題を引き起こし、結果としてジョブが失敗する可能性があります。このフラグを使用して、行数でインポートジョブをバッチに分割し、メモリの問題やジョブの失敗の可能性を減らします。 | | **Default timezone** | インポート時に使用するタイムゾーン。 | | **After SELECT** | このSQLは、同じトランザクション内のSELECTクエリの後に実行されます。 | | **Column Options** | このオプションを選択して、インポート前にカラムのタイプを変更します。入力したデータ設定を保存するには、**Save**を選択します。 | | **Default Column Options** | このオプションを選択して、インポート前にデフォルトのSQLタイプに従ってデータタイプを定義します。入力したデータ設定を保存するには、**Save**を選択します。 このオプションはTD Consoleでは使用できません。TD CLIまたはTD Workflowを使用してこのオプションを設定します。 | ### Data Preview インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。 1. **Next** を選択します。Data Preview ページが開きます。 2. データをプレビューする場合は、**Generate Preview** を選択します。 3. データを確認します。 ### Data Placement データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。 1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。 2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。 3. オプションで、database 名を入力します。 4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。 5. オプションで、table 名を入力します。 6. データをインポートする方法を選択します。 - **Append** (デフォルト) - データインポートの結果は table に追加されます。 table が存在しない場合は作成されます。 - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。 - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。 7. **Timestamp-based Partition Key** 列を選択します。 デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。 8. データストレージの **Timezone** を選択します。 9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。 #### 一度だけ実行 1. **Off** を選択します。 2. **Scheduling Timezone** を選択します。 3. **Create & Run Now** を選択します。 #### 定期的に繰り返す 1. **On** を選択します。 2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。 3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。 4. **Scheduling Timezone** を選択します。 5. **Create & Run Now** を選択します。 転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。 ## 詳細情報 - [PostgreSQLデータコネクタのオプション一覧](https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql) ## CLI(Toolbelt)を使用したPostgreSQLからのインポート ### 'td'コマンドのインストール 最新の[Treasure Data Toolbelt](https://docs.treasuredata.com/smart/project-product-documentation/td-toolbelt)をインストールします。 ### Seed設定ファイル(seed.yml)の作成 PostgreSQLアクセス情報で*seed.yml*を設定します: ```bash in: type: postgresql host: postgresql_host_name port: 5432 ssl: true ssl_version: TLS user: test_user password: test_password driver_version: 42.7.x database: test_database table: test_table select: "*" default_column_options: TIMESTAMP: {type: string, timestamp_format: '%Y-%m-%d', timezone: '+0900'} BIGINT: {type: string} mode: replace ``` この例では、テーブル内のすべてのレコードをインポートします。[追加パラメータ](https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql)を使用して、より詳細な制御を行うことができます。 利用可能なoutモードまたは利用可能なssl_versionの詳細については、付録を参照してください。 ### フィールドの推測(load.ymlの生成) *td connector:guess*を使用します。このコマンドは、ターゲットデータを自動的に読み取り、データ形式をインテリジェントに推測します。 ``` $ td connector:guess seed.yml -o load.yml ``` *load.yml*を開くと、場合によってはファイル形式、エンコーディング、カラム名、タイプを含む、推測されたファイル形式定義が表示されます。 ### オプション: インポートするデータのプレビュー *td connector:preview*コマンドを使用して、インポートするデータをプレビューできます。 ``` td connector:preview load.yml ``` ## ロードジョブの実行 ロードジョブを送信します。データサイズに応じて、ジョブの実行に数時間かかる場合があります。データが保存されているデータベースとテーブルを指定する必要があります。 Treasure Dataのストレージは時間でパーティション分割されているため、--time-columnオプションを指定することをお勧めします(データパーティショニングも参照)。オプションが指定されていない場合、データコネクタは最初のlongまたはtimestampカラムをパーティショニング時間として選択します。--time-columnで指定されたカラムのタイプは、longまたはtimestampタイプである必要があります。 データに時間カラムがない場合は、*add_time*フィルターオプションを使用して追加できます。詳細については、[add_timeフィルタープラグイン](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。 ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table ``` *connector:issue*コマンドは、*database(td_sample_db)*と*table(td_sample_table)*をすでに作成していることを前提としています。データベースまたはテーブルがTDに存在しない場合、*connector:issue*コマンドは失敗します。この場合、データベースとテーブルを[手動で](https://docs.treasuredata.com/smart/project-product-documentation/data-management)作成するか、*td connector:issue*コマンドで*--auto-create-table*オプションを使用して、データベースとテーブルを自動作成します: ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` "--time-column"オプションで、Time FormatカラムをPartitioning Keyに割り当てることができます。 ## Workflowを使用したPostgreSQLからのインポート PostgreSQLからデータをインポートする方法を示すサンプルワークフローについては、[Treasure Boxes](https://github.com/treasure-data/treasure-boxes/tree/master/td_load/postgresql)を参照してください。 ### インクリメンタルロード テーブルからカラムを*incremental_column*sパラメータに指定することで、レコードをインクリメンタルにロードできます。オプションで、*last_record*パラメータにいくつかの初期値を指定できます。 ```yaml in: type: postgresql host: postgresql_host_name port: 5432 user: test_user password: test_password database: test_database table: test_table incremental: true incremental_columns: [id, created_at] last_record: [10000, '2014-02-16T13:01:06.000000Z'] out: mode: append exec: {} ``` `incremental\_columns:`オプションを最適に使用するには、関連するカラムにインデックスを作成して、フルテーブルスキャンを回避します。この例では、次のインデックスを作成する必要があります: ```sql CREATE INDEX embulk_incremental_loading_index ON test_table (id, created_at); ``` ```sql -- last_recordが指定されていない場合 SELECT * FROM( ...original query is here ) ORDER BY id, created_at ``` ```sql -- last_recordが指定された場合 SELECT * FROM( ...original query is here ) WHERE id > 10000 OR (id = 10000 AND created_at > '2014-02-16T13:01:06.000000Z') ORDER BY id, created_at ``` コネクタは自動的に*last_record*を生成し、次のスケジュール実行時にそれを使用します。 ```yaml in: type: postgresql ... out: ... Config Diff --- in: last_record: - 20000 - '2015-06-16T16:32:14.000000Z' ``` `incremental: true`を設定すると、`query`オプションは使用できません。 incremental_columnsとしてサポートされているのは、文字列、整数、timestamp、およびtimestamptz(タイムゾーン付きタイムスタンプ)のみです。 ### Array Columnの読み込み PostgreSQLの`array`型は、文字列型として取得されます。 ### hstore Columnの読み込み PostgreSQLのhstore型は、データコネクタが最初に読み取るときに文字列型として取得されます。したがって、hstore型をjson型として使用する場合は、config_optionsを指定して、明示的に型をjson型に変換する必要があります。 例えば、v_hstoreがPostgreSQLでhstore型の場合: ```yaml in: type: postgresql host: xxx ... table: my_tbl select: "*" column_options: v_hstore: {type: json} # 明示的な型変換: string型からjson型へ out: ... ``` ### SSLバージョン `ssl_version`オプションを使用して、PostgreSQLサーバーが使用している特定のSSLバージョンを使用できます。 ```yaml in: type: postgresql ... ssl: true ssl_version: TLSv1.1 ``` サポートされている値は次のとおりです。 - TLS - TLSv1.1 - TLSv1.2 - TLSv1.3 ### Default Column Options SQLデータ型の特定のデフォルト形式を使用できます。 以下の例では、TIMESTAMPは`%Y-%m-%d`形式でタイムゾーン`+0900`の文字列に変換されます。 SQLタイプは、TIMESTAMPやBIGINTなどの大文字である必要があります。 ```yaml in: type: postgresql ... default_column_options: TIMESTAMP: {type: string, timestamp_format: '%Y-%m-%d', timezone: '+0900'} BIGINT: {type: string} ```