MongoDB用Data Connectorは、MongoDBサーバーに保存されているドキュメント(レコード)をTreasure Dataにインポートできます。
- Treasure Dataの基本知識
TDコンソールからMongoDBデータコネクタのインスタンスを作成できます。MongoDBコネクタタイルで作成を選択します。

MongoDBインスタンスに必要な認証情報を入力します。以下のパラメータを設定します。
Auth method: 認証方法を指定します。
"Auto"を選択した場合、コネクタは認証先のサーバーのバージョンに基づいて最適なメカニズムをネゴシエートします。
サーバーのバージョンが3.0以上の場合、ドライバーはSCRAM-SHA-1メカニズムを使用して認証します。
それ以外の場合、ドライバーはMONGODB_CRメカニズムを使用して認証します。
Auth source: ユーザーが定義されているデータベース名。
Username: リモートデータベースに接続するためのユーザー名。
Password: リモートデータベースに接続するためのパスワード。
Use TLS?: TLS(SSL)を使用して接続する場合は、このボックスをチェックします。
Bypass certificate validations?: すべての証明書検証をバイパスする場合は、このボックスをチェックします。
Use Service records?: MongoDB cloudでクラスタを作成する場合は有効にします。有効にすると、hostsの最初のホストのみが使用され、値はクラスタ名(例: cluster0.be2g8go.mongodb.net)である必要があります。
Hostname: リモートサーバーのホスト名またはIPアドレス。(MongoDBのセットアップによっては、複数のIPアドレスを追加できます。)
Port: リモートサーバーのポート番号(デフォルトは27017)。
Options: ドライバーに渡されるJDBCキーバリューペア(これらのオプションの詳細については、Connection Optionsを参照してください)

必要な接続の詳細を入力した後、Continueを選択します。後で接続の詳細を変更する必要がある場合に見つけられるように、接続に名前を付けます。この接続を組織内の他のユーザーと共有したい場合は、Share with othersを選択します。このボックスがチェックされていない場合、この接続はあなたにのみ表示されます。
Create Connectionを選択して接続を完了します。接続が成功すると、作成した接続が、指定した名前で接続リストに表示されます。
リモートデータベースへの接続を作成した後、データベースからTreasure Dataにデータをインポートできます。アドホックな1回限りの転送または定期的な間隔での繰り返し転送を設定できます。

データを取り込むデータベースとテーブルの詳細を入力します。
- Database name: データを転送するデータベースの名前。(例:
your_database_name) - Collection Name: データを転送するコレクションの名前。
- JSON Query: 返すレコードを指定します
- JSON Projection: 返すフィールドを指定します

次のステップでデータをプレビューするには、Nextを選択します。
接続にエラーがない場合は、インポートされるデータのプレビューが表示されます。プレビューを表示できない場合、またはプレビューの表示に問題がある場合は、サポートにお問い合わせください。

プレビュー中およびデータインポートの実行時、レコードは1つの列にインポートされます。インポートに非標準のオプションを使用する必要がある場合は、Advanced Settingsを選択します。
Advanced Settingsでは、特別な要件に対応するために転送の側面を変更できます。Preview > Advanced Settingsでは、以下のフィールドが利用可能です。
- Object ID field name: インポートするObject IDフィールド名の名前。
- Load only new records each run: チェック/有効にすると、ソートするフィールドを指定する必要があります
- Sort by: レコードのソートに使用するフィールド。
Load only new records each runがチェックされている場合は必須です。 - Aggregation query : 集約のクエリ文字列。Aggregation — MongoDB ManualとAggregation Pipeline Stages — MongoDB Manualも参照してください
- Output column name: レコードを出力する列の名前。
- Stop on invalid record: チェックすると、無効なレコードに遭遇した場合、転送は停止し、完了しません。

このフェーズでは、データをインポートするTreasure Dataのターゲットデータベースとテーブルを選択します。Create new databaseまたはCreate new tableを使用して、新しいデータベースまたはテーブルを作成できます。
- Database: データをインポートするデータベース。
- Table: データをインポートするデータベース内のテーブル。
- Mode: Append – 既存のテーブルにレコードを追加できます。
- Mode: Replace – テーブル内の既存のデータを、インポートされるデータに置き換えます。
- Partition Key Seed: パーティション化時間列として使用するlongまたはtimestamp列を選択します。時間列を指定しない場合、転送のアップロード時間が
time列の追加と組み合わせて使用されます。 - Data Storage Timezone: データが保存されるタイムゾーン。データもこのタイムゾーンで表示されます。

このフェーズでは、転送を1回だけ実行するか、指定した頻度で実行するようにスケジュールするかを選択できます。
When
Once now: 転送を1回だけ実行します。
Repeat…
- Schedule:
@hourly、@daily、@monthly、およびカスタムcronの3つのオプションを受け入れます。 - Delay Transfer: 実行時間の遅延を追加します。
- Schedule:
Time Zone: 'Asia/Tokyo'のような拡張タイムゾーン形式をサポートします。

頻度を選択した後、Start Transferを選択して転送を開始します。エラーがなければ、Treasure Dataへの転送が完了し、データが利用可能になります。
完了したばかりの転送や他のデータコネクタジョブを確認する必要がある場合は、My Input Transfersセクションで転送のリストを表示できます。

コマンドラインインターフェイスからMongoDBデータコネクタを使用することもできます。以下の手順は、CLIを使用してデータをインポートする方法を示しています。
最新のTD Toolbeltをインストールします。
$ td --version
0.11.10まず、MongoDBの詳細を含むseed.ymlを準備します。以下の内容でseed.ymlを作成します。
in:
type: mongodb
hosts:
- {host: HOST, port: PORT}
auth_method: auto
#auth_source: AUTH_SOURCE_DB
user: USER
password: PASSWORD
database: DATABASE
collection: COLLECTION
options:
connectTimeoutMS: 10000
socketTimeoutMS: 10000
projection: '{"_id": 0}'
query: '{}'
sort: '{}'
stop_on_invalid_record: true
out:
mode: append
exec: {}MongoDB用Data Connectorは、指定されたコレクションに保存されているすべてのドキュメントをインポートします。以下のオプションを使用して、フィールドをフィルタリング、クエリを指定、またはソートできます。
クエリ結果のprojectionに使用されるJSONドキュメント。ドキュメント内のフィールドは、この条件に一致する場合にのみ使用されます。
projection: '{ "_id": 1, "user_id": 1, "company": 1 }'ソースコレクションのqueryingに使用されるJSONドキュメント。この条件に一致する場合、ドキュメントはコレクションから読み込まれます。
query: '{ user_id: { $gte: 20000 } }'結果の順序
sort: '{ "field1": 1, "field2": -1}' # field1 昇順、field2 降順このオプションはaggregationオプションと一緒に使用できません。
利用可能なoutモードの詳細については、付録を参照してください。
集約クエリ
aggregation: '{ $match: { field1: { $gt: 1}} }' # field1が1より大きい場合このオプションはsortオプションと一緒に使用できません。
利用可能なoutモードの詳細については、付録を参照してください。
Data Connector MongoDBは、MongoDBのドキュメントを単一の列として読み込むため、connector:guessをサポートしていません。load.ymlのすべての設定を編集してください。
previewコマンドを使用して、システムがドキュメントをどのように解析するかをプレビューできます。
$ td connector:preview load.yml
+---------------------------------------------------------------------------------------------------------------------+
| record:json |
+---------------------------------------------------------------------------------------------------------------------+
| "{\"user_id\":11200,\"company\":\"AA Inc.\",\"customer\":\"David\",\"created_at\":\"2015-03-31T06:12:37.000Z\"}" |
| "{\"user_id\":20313,\"company\":\"BB Imc.\",\"customer\":\"Tom\",\"created_at\":\"2015-04-01T01:00:07.000Z\"}" |
| "{\"user_id\":32132,\"company\":\"CC Inc.\",\"customer\":\"Fernando\",\"created_at\":\"2015-04-01T10:33:41.000Z\"}" |
| "{\"user_id\":40133,\"company\":\"DD Inc.\",\"customer\":\"Cesar\",\"created_at\":\"2015-04-02T05:12:32.000Z\"}" |
| "{\"user_id\":93133,\"company\":\"EE Inc.\",\"customer\":\"Jake\",\"created_at\":\"2015-04-02T14:11:13.000Z\"}" |
+---------------------------------------------------------------------------------------------------------------------+データコネクタは、"boolean"、"long"、"double"、"string"、"timestamp"タイプの解析をサポートしています。
また、データロードジョブを実行する前に、ローカルデータベースとテーブルを作成する必要があります。
$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table最後に、ロードジョブを送信します。データのサイズによっては、数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを指定します。
Treasure Dataのストレージは時間でパーティション化されているため、--time-columnオプションを指定してください(Data Partitioning in Treasure Dataを参照)。オプションが指定されていない場合、Data Connectorは最初のlongまたはtimestamp列をパーティション化時間として選択します。--time-columnで指定する列のタイプは、longまたはtimestampタイプである必要があります。
データに時間列がない場合は、add_timeフィルタオプションを使用して追加できます。詳細については、add_time filter pluginを参照してください。
JSON列を展開したい場合は、expand_jsonフィルタオプションを使用して追加できます。詳細については、expand_json filter pluginを参照してください
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_atconnector:issueコマンドは、データベース*(td_sample_db)とテーブル(td_sample_table)がすでに作成されていることを前提としています。データベースまたはテーブルがTDに存在しない場合、connector:issueコマンドは失敗するため、データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-table*オプションを使用してデータベースとテーブルを自動作成してください:
$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-tableData Connectorはサーバー側でレコードをソートしません。時間ベースのパーティション化を効果的に使用するには、事前にレコードをソートしてください。
timeというフィールドがある場合は、--time-columnオプションを指定する必要はありません。
td connector:issue load.yml --database td_sample_db --table td_sample_tableincremental_fieldおよびlast_recordオプションを利用して、日付情報を含むテーブル内のフィールドを指定することで、レコードをインクリメンタルにロードできます。
in:
type: mongodb
hosts:
- {host: HOST, port: PORT}
user: USER
password: PASSWORD
database: DATABASE
collection: COLLECTION
projection: '{"_id": 0}'
incremental_field:
- "field1"
last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}}
stop_on_invalid_record: true
out:
mode: append
exec: {}コネクタは自動的にクエリとソート値を作成します。
query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }}' # field1 > "2015-01-25T13:23:15.000Z"
sort '{"field1", 1}' # field1 昇順incremental_fieldsに複数のフィールドを指定することもできます。
incremental_field:
- "field1"
- "field2"
last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}, "field2": 13215}コネクタは'AND'条件を使用してクエリとソート値を作成します。
query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }, field2: { $gt: 13215}}' # field1 > "2015-01-25T13:23:15.000Z" AND field2 > 13215
sort '{"field1", 1, "field2", 1}' # field1 昇順、field2 昇順incremental\_fieldを指定する場合、sortオプションは使用できません。incremental\_fieldを指定する場合、aggregationオプションは使用できません。
フィールドタイプがObjectIdまたはDateTimeの場合、last\_recordを特殊文字で指定する必要があります。
- ObjectIdフィールド
in:
type: mongodb
incremental_field:
- "_id"
last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}- DateTimeフィールド
in:
type: mongodb
incremental_field:
- "time_field"
last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}MongoDBデータインポートの定期的なデータコネクタ実行をスケジュールできます。高可用性を確保するために、スケジューラーを慎重に設定しています。この機能を使用することで、ローカルデータセンターにcronデーモンを用意する必要がなくなります。
td connector:createコマンドを使用して新しいスケジュールを作成できます。スケジュールの名前、cronスタイルのスケジュール、データが保存されるデータベースとテーブル、およびData Connector設定ファイルが必要です。
$ td connector:create \
daily_import \
"10 0 * * *" \
td_sample_db \
td_sample_table \
load.ymlTreasure Dataのストレージは時間でパーティション化されているため、--time-columnオプションを指定することもお勧めします(data partitioningを参照)。
$ td connector:create \
daily_import \
"10 0 * * *" \
td_sample_db \
td_sample_table \
load.yml \
--time-column created_atcronパラメータは、3つの特別なオプション(@hourly、@daily、@monthly)も受け入れます。 デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーンの略語はサポートされておらず、予期しないスケジュールになる可能性があります。