Data Connector for MongoDB

The Data Connector for MongoDB enables importing documents(records) stored in your MongoDB server, to Treasure Data.

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data

Step 0: Install ‘td’ command v0.11.9 or later

Install the newest Treasure Data Toolbelt.

$ td --version
0.11.10

Step 1: Create Seed Config File (seed.yml)

First, prepare seed.yml as below, with your MongoDB details. Please create seed.yml with the following content.

in:
  type: mongodb
  hosts:
    - {host: <HOST>, port: <PORT>}
  user: <USER>
  password: <PASSWORD>
  database: <DATABASE>
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  query: '{}'
  sort: '{}'
  stop_on_invalid_record: true
out:
  mode: append
  exec: {}

The Data Connector for MongoDB imports all documents that stored at specified collection. You may filter fields, specify query, sort with following options.

projection option

A JSON document used for projection on query results. Fields in a document are used only if they match with this condition.

projection: '{ "_id": 1, "user_id": 1, "company": 1 }'

query option

A JSON document used for querying on the source collection. Documents are loaded from the colleciton if they match with this condition.

query: '{ user_id: { $gte: 20000 } }'

sort option

Order of result

sort: '{ "field1": 1, "field2": -1}' # field1 ascending, field2 descending

For more details on available out modes, see Appendix.

Step 2: Guess Fields (Generate load.yml)

The Data Connector MongoDB load MongoDB’s documents as single column so that doesn’t support connector:guess. Please fill out all settings in your load.yml.

You can preview how the system will parse the documents by using the preview command.

$ td connector:preview load.yml
+---------------------------------------------------------------------------------------------------------------------+
| record:json                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------+
| "{\"user_id\":11200,\"company\":\"AA Inc.\",\"customer\":\"David\",\"created_at\":\"2015-03-31T06:12:37.000Z\"}"    |
| "{\"user_id\":20313,\"company\":\"BB Imc.\",\"customer\":\"Tom\",\"created_at\":\"2015-04-01T01:00:07.000Z\"}"      |
| "{\"user_id\":32132,\"company\":\"CC Inc.\",\"customer\":\"Fernando\",\"created_at\":\"2015-04-01T10:33:41.000Z\"}" |
| "{\"user_id\":40133,\"company\":\"DD Inc.\",\"customer\":\"Cesar\",\"created_at\":\"2015-04-02T05:12:32.000Z\"}"    |
| "{\"user_id\":93133,\"company\":\"EE Inc.\",\"customer\":\"Jake\",\"created_at\":\"2015-04-02T14:11:13.000Z\"}"     |
+---------------------------------------------------------------------------------------------------------------------+
Untitled-3
Currently, the Data Connector supports parsing of "boolean", "long", "double", "string", and "timestamp" types.
Untitled-3
You will also need to create a local database and table prior to executing the data load job. Please follow these steps:
$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table

Step 3: Execute Load Job

Finally, submit the load job. It may take a couple of hours depending on the size of the data. Users need to specify the database and collection where their data is stored.

It’s also recommended to specify --time-column option, since Treasure Data’s storage is partitioned by time (see architecture) If the option is not provided, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long and timestamp type.

If your data doesn’t have a time column you may add it using add_time filter option. More details at add_time filter plugin

If your want to expand json column you may add it using expand_json filter option. More details at expand_json filter plugin

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The above command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table do not exist in TD this command will not succeed, so create the database and table manually or use --auto-create-table option with td connector:issue command to auto create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table
Untitled-3
At present, the Data Connector does not sort records on server-side. To use time-based partitioning effectively, please sort records beforehand. This restriction will be solved in the near future.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

Incremental load

You can load records incrementally by specifying a field in your table that contains date information by utilizing the incremental_field and last_record options.

in:
  type: mongodb
  hosts:
    - {host: <HOST>, port: <PORT>}
  user: <USER>
  password: <PASSWORD>
  database: <DATABASE>
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  incremental_field:
    - "field1"
  last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}}
  stop_on_invalid_record: true
out:
  mode: append
  exec: {}

Then plugin will automatically recreate query and sort value at internal.

query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }}' # field1 > "2015-01-25T13:23:15.000Z"
sort '{"field1", 1}' # field1 ascending

Incremental load with multiple fields

You can also specify multiple fields for incremental_fields.

::yaml
incremental_field:
  - "field1"
  - "field2"
last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}, "field2": 13215}

Then plugin will create query and sort value using ‘AND’ condition.

query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }, field2: { $gt: 13215}}' # field1 > "2015-01-25T13:23:15.000Z" AND field2 > 13215
sort '{"field1", 1, "field2", 1}' # field1 ascending, field2 ascending
Untitled-3
`sort` option can't available when you specify `incremental_field`.
Untitled-3
You have to specify `last_record` with special characters when field type is ObjectId or DateTime.
# ObjectId field
in:
  type: mongodb
  incremental_field:
    - "_id"
  last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}

# DateTime field
in:
  type: mongodb
  incremental_field:
    - "time_field"
  last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}

Scheduled execution

You can schedule periodic Data Connector execution for MongoDB data import. We take great care in distributing and operating our scheduler in order to achieve high availability. By using this feature, you no longer need a cron daemon on your local datacenter.

Create the schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where the data will be stored, and the Data Connector configuration file.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option, since Treasure Data’s storage is partitioned by time (see architecture)

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at
Untitled-3
The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.
Untitled-3
By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Please note that `--timezone` option only supports extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                     |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"mongodb", "collection"... |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+

Show the Setting and Schedule History

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: mongodb
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  filters:
    type: expand_json
    ...

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual run, please use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete the Schedule

td connector:delete will remove the schedule.

$ td connector:delete daily_import

Appendix

A) Modes for out plugin

You can specify data import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Please note that any manual schema changes made to the target table will remain intact with this mode.

in:
  ...
out:
  mode: replace

Last modified: Mar 15 2017 14:28:45 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.