# MongoDB Import Integration MongoDB用Data Connectorは、MongoDBサーバーに保存されているドキュメント(レコード)をTreasure Dataにインポートできます。 ## 前提条件 - Treasure Dataの基本知識 ## 接続の設定 TDコンソールからMongoDBデータコネクタのインスタンスを作成できます。MongoDBコネクタタイルで**作成**を選択します。 ![](/assets/image-20191004-223623.7ef9ab1bd405dc4a16ef8afd171bf080c78d6b5246be9666b94f183ff8c3a3d9.a49c45cd.png) ## 新しいMongoDBコネクタの作成 MongoDBインスタンスに必要な認証情報を入力します。以下のパラメータを設定します。 - **Auth method**: 認証方法を指定します。 - "Auto"を選択した場合、コネクタは認証先のサーバーのバージョンに基づいて最適なメカニズムをネゴシエートします。 サーバーのバージョンが3.0以上の場合、ドライバーはSCRAM-SHA-1メカニズムを使用して認証します。 それ以外の場合、ドライバーはMONGODB_CRメカニズムを使用して認証します。 - **Auth source**: ユーザーが定義されているデータベース名。 - **Username**: リモートデータベースに接続するためのユーザー名。 - **Password**: リモートデータベースに接続するためのパスワード。 - **Use TLS?:** TLS(SSL)を使用して接続する場合は、このボックスをチェックします。 - **Bypass certificate validations?:** すべての証明書検証をバイパスする場合は、このボックスをチェックします。 - **Use Service records?:** MongoDB cloudでクラスタを作成する場合は有効にします。有効にすると、hostsの最初のホストのみが使用され、値はクラスタ名(例: [cluster0.be2g8go.mongodb.net](http://cluster0.be2g8go.mongodb.net))である必要があります。 - **Hostname**: リモートサーバーのホスト名またはIPアドレス。(MongoDBのセットアップによっては、複数のIPアドレスを追加できます。) - **Port**: リモートサーバーのポート番号(デフォルトは27017)。 - **Options**: ドライバーに渡されるJDBCキーバリューペア(これらのオプションの詳細については、[Connection Options](https://www.mongodb.com/docs/drivers/java/sync/v4.3/fundamentals/connection/connection-options/)を参照してください) ![](/assets/screenshot-2024-12-23-at-15.02.47.9d4c0e70d91135ccba7d1dbe448d4ee2a516706d6a52e55daec42635f6e06c9c.a49c45cd.png) 必要な接続の詳細を入力した後、**Continue**を選択します。後で接続の詳細を変更する必要がある場合に見つけられるように、接続に名前を付けます。この接続を組織内の他のユーザーと共有したい場合は、**Share with others**を選択します。このボックスがチェックされていない場合、この接続はあなたにのみ表示されます。 **Create Connection**を選択して接続を完了します。接続が成功すると、作成した接続が、指定した名前で接続リストに表示されます。 ## Treasure Dataへのデータ転送 リモートデータベースへの接続を作成した後、データベースからTreasure Dataにデータをインポートできます。アドホックな1回限りの転送または定期的な間隔での繰り返し転送を設定できます。 ![](/assets/image-20191004-224315.64f25506fcc9f2bc14966d409b8cabc45acbdf7836643901ab86b64d4faafc3d.a49c45cd.png) ### データベースの詳細を入力(Fetch From) データを取り込むデータベースとテーブルの詳細を入力します。 - **Database name**: データを転送するデータベースの名前。(例: `your_database_name`) - **Collection Name:** データを転送するコレクションの名前。 - **JSON Query:** 返すレコードを指定します - **JSON Projection:** 返すフィールドを指定します ![](/assets/image-20191004-224340.405d69795491ebde55888a8fbe11660317b61b6b27f689c6e1d4028e9cf88b59.a49c45cd.png) 次のステップでデータをプレビューするには、**Next**を選択します。 ### プレビュー 接続にエラーがない場合は、インポートされるデータのプレビューが表示されます。プレビューを表示できない場合、またはプレビューの表示に問題がある場合は、[サポート](mailto:support@treasuredata.com)にお問い合わせください。 ![](/assets/image-20191004-224456.10d710372f66ddf8eba2e55a4d879c02bc8aaf368f4312ea55491bbec9f51aac.a49c45cd.png) プレビュー中およびデータインポートの実行時、レコードは1つの列にインポートされます。インポートに非標準のオプションを使用する必要がある場合は、**Advanced Settings**を選択します。 Advanced Settingsでは、特別な要件に対応するために転送の側面を変更できます。Preview > **Advanced Settings**では、以下のフィールドが利用可能です。 - **Object ID field name**: インポートするObject IDフィールド名の名前。 - **Load only new records each run**: チェック/有効にすると、ソートするフィールドを指定する必要があります - **Sort by**: レコードのソートに使用するフィールド。`Load only new records each run`がチェックされている場合は必須です。 - **Aggregation query** : 集約のクエリ文字列。[Aggregation — MongoDB Manual](https://docs.mongodb.com/manual/aggregation/)と[Aggregation Pipeline Stages — MongoDB Manual](https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/)も参照してください - **Output column name:** レコードを出力する列の名前。 - **Stop on invalid record**: チェックすると、無効なレコードに遭遇した場合、転送は停止し、完了しません。 ![](/assets/image-20191004-224507.a8b833e1d659a689964854dbb5930a92e67a48b186d8d72639dfd405d6aa227e.a49c45cd.png) ### Transfer To このフェーズでは、データをインポートするTreasure Dataのターゲットデータベースとテーブルを選択します。**Create new database**または**Create new table**を使用して、新しいデータベースまたはテーブルを作成できます。 - **Database:** データをインポートするデータベース。 - **Table**: データをインポートするデータベース内のテーブル。 - **Mode**: Append – 既存のテーブルにレコードを追加できます。 - **Mode**: Replace – テーブル内の既存のデータを、インポートされるデータに置き換えます。 - **Partition Key Seed**: パーティション化時間列として使用するlongまたはtimestamp列を選択します。時間列を指定しない場合、転送のアップロード時間が`time`列の追加と組み合わせて使用されます。 - **Data Storage Timezone**: データが保存されるタイムゾーン。データもこのタイムゾーンで表示されます。 ![](/assets/image-20191004-224524.1f7b4d78f5153602b37523ccb2e73e913df9dea0b1bbddf7c72a357bf64c289a.a49c45cd.png) ### データ転送の頻度(When) このフェーズでは、転送を1回だけ実行するか、指定した頻度で実行するようにスケジュールするかを選択できます。 - When - **Once now**: 転送を1回だけ実行します。 - **Repeat…** - **Schedule**: `@hourly`、`@daily`、`@monthly`、およびカスタム`cron`の3つのオプションを受け入れます。 - **Delay Transfer**: 実行時間の遅延を追加します。 - **Time Zone**: 'Asia/Tokyo'のような拡張タイムゾーン形式をサポートします。 ![](/assets/image-20191004-224537.28887603e0c4e2ea002679cd2bb054f427c237ea91a0bbd708e86992a40ff852.a49c45cd.png) 頻度を選択した後、**Start Transfer**を選択して転送を開始します。エラーがなければ、Treasure Dataへの転送が完了し、データが利用可能になります。 ### My Input Transfers 完了したばかりの転送や他のデータコネクタジョブを確認する必要がある場合は、`My Input Transfers`セクションで転送のリストを表示できます。 ![](/assets/image-20191004-224554.3b670d4e1d06bcc989dfa5d4800a075c82728ec644d8e8b176aa4fec90c89480.a49c45cd.png) ## CLIを使用したコネクタの設定 コマンドラインインターフェイスからMongoDBデータコネクタを使用することもできます。以下の手順は、CLIを使用してデータをインポートする方法を示しています。 ### 'td'コマンドv0.11.9以降をインストール 最新の[TD Toolbelt](https://toolbelt.treasuredata.com/)をインストールします。 ``` $ td --version 0.11.10 ``` ### シード設定ファイルの作成(seed.yml) まず、MongoDBの詳細を含む`seed.yml`を準備します。以下の内容で`seed.yml`を作成します。 ```yaml in: type: mongodb hosts: - {host: HOST, port: PORT} auth_method: auto #auth_source: AUTH_SOURCE_DB user: USER password: PASSWORD database: DATABASE collection: COLLECTION options: connectTimeoutMS: 10000 socketTimeoutMS: 10000 projection: '{"_id": 0}' query: '{}' sort: '{}' stop_on_invalid_record: true out: mode: append exec: {} ``` MongoDB用Data Connectorは、指定されたコレクションに保存されているすべてのドキュメントをインポートします。以下のオプションを使用して、フィールドをフィルタリング、クエリを指定、またはソートできます。 ### 3.2.1. Projectionオプション クエリ結果の[projection](https://docs.mongodb.com/manual/reference/operator/projection/positional/)に使用されるJSONドキュメント。ドキュメント内のフィールドは、この条件に一致する場合にのみ使用されます。 ``` projection: '{ "_id": 1, "user_id": 1, "company": 1 }' ``` ### Queryオプション ソースコレクションの[querying](https://docs.mongodb.com/manual/tutorial/query-documents/)に使用されるJSONドキュメント。この条件に一致する場合、ドキュメントはコレクションから読み込まれます。 ``` query: '{ user_id: { $gte: 20000 } }' ``` ### Sortオプション 結果の順序 ``` sort: '{ "field1": 1, "field2": -1}' # field1 昇順、field2 降順 ``` このオプションはaggregationオプションと一緒に使用できません。 利用可能な`out`モードの詳細については、[付録](/ja/int/mongodb-import-integration#h1__1835053169)を参照してください。 ### Aggregationオプション 集約クエリ ``` aggregation: '{ $match: { field1: { $gt: 1}} }' # field1が1より大きい場合 ``` このオプションはsortオプションと一緒に使用できません。 利用可能な`out`モードの詳細については、[付録](/ja/int/mongodb-import-integration#h1__1835053169)を参照してください。 ## フィールドの推測(load.ymlの生成) Data Connector MongoDBは、MongoDBのドキュメントを単一の列として読み込むため、`connector:guess`をサポートしていません。`load.yml`のすべての設定を編集してください。 `preview`コマンドを使用して、システムがドキュメントをどのように解析するかをプレビューできます。 ``` $ td connector:preview load.yml +---------------------------------------------------------------------------------------------------------------------+ | record:json | +---------------------------------------------------------------------------------------------------------------------+ | "{\"user_id\":11200,\"company\":\"AA Inc.\",\"customer\":\"David\",\"created_at\":\"2015-03-31T06:12:37.000Z\"}" | | "{\"user_id\":20313,\"company\":\"BB Imc.\",\"customer\":\"Tom\",\"created_at\":\"2015-04-01T01:00:07.000Z\"}" | | "{\"user_id\":32132,\"company\":\"CC Inc.\",\"customer\":\"Fernando\",\"created_at\":\"2015-04-01T10:33:41.000Z\"}" | | "{\"user_id\":40133,\"company\":\"DD Inc.\",\"customer\":\"Cesar\",\"created_at\":\"2015-04-02T05:12:32.000Z\"}" | | "{\"user_id\":93133,\"company\":\"EE Inc.\",\"customer\":\"Jake\",\"created_at\":\"2015-04-02T14:11:13.000Z\"}" | +---------------------------------------------------------------------------------------------------------------------+ ``` データコネクタは、"boolean"、"long"、"double"、"string"、"timestamp"タイプの解析をサポートしています。 また、データロードジョブを実行する前に、ローカルデータベースとテーブルを作成する必要があります。 ``` $ td database:create td_sample_db $ td table:create td_sample_db td_sample_table ``` ## ロードジョブの実行 最後に、ロードジョブを送信します。データのサイズによっては、数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを指定します。 Treasure Dataのストレージは時間でパーティション化されているため、`--time-column`オプションを指定してください([Data Partitioning in Treasure Data](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)を参照)。オプションが指定されていない場合、Data Connectorは最初の`long`または`timestamp`列をパーティション化時間として選択します。`--time-column`で指定する列のタイプは、`long`または`timestamp`タイプである必要があります。 データに時間列がない場合は、`add_time`フィルタオプションを使用して追加できます。詳細については、[add_time filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。 JSON列を展開したい場合は、`expand_json`フィルタオプションを使用して追加できます。詳細については、[expand_json filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/expand_json-filter-function)を参照してください ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at ``` connector:issueコマンドは、データベース*(td_sample_db)*とテーブル*(td_sample_table)*がすでに作成されていることを前提としています。データベースまたはテーブルがTDに存在しない場合、connector:issueコマンドは失敗するため、データベースとテーブルを[手動で](https://docs.treasuredata.com/smart/project-product-documentation/data-management)作成するか、*td connector:issue*コマンドで*--auto-create-table*オプションを使用してデータベースとテーブルを自動作成してください: ``` $ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table ``` Data Connectorはサーバー側でレコードをソートしません。時間ベースのパーティション化を効果的に使用するには、事前にレコードをソートしてください。 `time`というフィールドがある場合は、`--time-column`オプションを指定する必要はありません。 ``` td connector:issue load.yml --database td_sample_db --table td_sample_table ``` ## インクリメンタルロード `incremental_field`および`last_record`オプションを利用して、日付情報を含むテーブル内のフィールドを指定することで、レコードをインクリメンタルにロードできます。 ```yaml in: type: mongodb hosts: - {host: HOST, port: PORT} user: USER password: PASSWORD database: DATABASE collection: COLLECTION projection: '{"_id": 0}' incremental_field: - "field1" last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}} stop_on_invalid_record: true out: mode: append exec: {} ``` コネクタは自動的にクエリとソート値を作成します。 ```yaml query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }}' # field1 > "2015-01-25T13:23:15.000Z" sort '{"field1", 1}' # field1 昇順 ``` ### 複数フィールドでのインクリメンタルロード `incremental_fields`に複数のフィールドを指定することもできます。 ```yaml incremental_field: - "field1" - "field2" last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}, "field2": 13215} ``` コネクタは'AND'条件を使用してクエリとソート値を作成します。 ``` query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }, field2: { $gt: 13215}}' # field1 > "2015-01-25T13:23:15.000Z" AND field2 > 13215 sort '{"field1", 1, "field2", 1}' # field1 昇順、field2 昇順 ``` `incremental\_field`を指定する場合、`sort`オプションは使用できません。`incremental\_field`を指定する場合、`aggregation`オプションは使用できません。 フィールドタイプがObjectIdまたはDateTimeの場合、`last\_record`を特殊文字で指定する必要があります。 - ObjectIdフィールド ```yaml in: type: mongodb incremental_field: - "_id" last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}} ``` - DateTimeフィールド ```yaml in: type: mongodb incremental_field: - "time_field" last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}} ``` ## スケジュールされた実行 MongoDBデータインポートの定期的なデータコネクタ実行をスケジュールできます。高可用性を確保するために、スケジューラーを慎重に設定しています。この機能を使用することで、ローカルデータセンターに`cron`デーモンを用意する必要がなくなります。 ### スケジュールの作成 `td connector:create`コマンドを使用して新しいスケジュールを作成できます。スケジュールの名前、cronスタイルのスケジュール、データが保存されるデータベースとテーブル、およびData Connector設定ファイルが必要です。 ``` $ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml ``` Treasure Dataのストレージは時間でパーティション化されているため、`--time-column`オプションを指定することもお勧めします([data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)を参照)。 ``` $ td connector:create \ daily_import \ "10 0 * * *" \ td_sample_db \ td_sample_table \ load.yml \ --time-column created_at ``` `cron`パラメータは、3つの特別なオプション(`@hourly`、`@daily`、`@monthly`)も受け入れます。 デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。`--timezone`オプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーンの略語は**サポートされておらず**、予期しないスケジュールになる可能性があります。