The Data Connector for MongoDB enables importing documents (records) stored in your MongoDB server, to Treasure Data.

Continue to the following topics:


Prerequisites

  • Basic knowledge of Treasure Data


Configure the Connection

You can create an instance of the MongoDB data connector from the TD Console. Select create on the MongoDB connector tile.



Create a New MongoDB Connector

Enter the required credentials for your MongoDB instance. Set the following parameters.

  • Auth method: Auth method to authenticate.

    • If you choose "Auto", The connector negotiates the best mechanism based on the version of the server that the connector is authenticating to.

      If the server version is 3.0 or higher, the driver authenticates using the SCRAM-SHA-1 mechanism.

      Otherwise, the driver authenticates using the MONGODB_CR mechanism.

  • Auth source: The database name where the user is defined.

  • Username: Username to connect to the remote database.

  • Password: Password to connect to the remote database.

  • Hostname: The hostname or IP address of the remote Server. (You can add more than one IP address, depending on your MongoDB setup.)

  • Port: Port number of the remote server (Default is 27017).


Select Continue after entering the required connection details. Name the connection so you can find it later if you need to modify any of the connection details. If you would like to share this connection with other users in your organization, select Share with others. If this box is unchecked, this connection is visible to only you.

Select Create Connection to complete the connection. If the connection is a success, then the connection you just created appears in your list of connections with the name you provided.

Transfer Data into Treasure Data

After creating the connection to your remote database, you can import the data from your database into Treasure Data. You can set up an ad hoc one-time transfer or a recurring transfer at regular intervals.


Enter Database Details (Fetch From)

Provide the details of the database and table from which you want to ingest data.

  • Database name: The name of the database from which you are transferring data. (for example, your_database_name)

  • Collection Name: The name of the collection from which you are transferring data.

  • JSON Query: Specifies records to return

  • JSON Projection: Specifies fields to return


Select Next to preview the data in the next step.

Preview

If there are no errors with the connection, you see a preview of the data to be imported. If you are unable to see the preview or have any issues viewing the preview, contact support.


The records arw imported into one column both during the preview and when the data import is run. If you need to use non-standard options for your import, select Advanced Settings.

Advanced Settings allow to you modify aspects of your transfer to allow for special requirements. The following fields are available in Preview > Advanced Settings.

  • Object ID field name: Name of Object ID field name to import.

  • Load only new records each run: If checked/enabled, you must specify which fields to sort by

  • Sort by: Fields to use to sort records. Required if `Load only new records each run` is checked.

  • Aggregation query : Query string for aggregation. See also Aggregation — MongoDB Manual and Aggregation Pipeline Stages — MongoDB Manual

  • Output column name: The name of the column to output the records to.

  • Stop on invalid record: If checked, the transfer will stop and not complete if it encounters an invalid record.


Transfer To

In this phase, select the Treasure Data target database and table into which you want to import your data. You can create a new database or table using Create new database or Create new table.

  • Database: The database into which to import the data.

  • Table: The table within the database to import the data.

  • Mode: Append – Allows you to add records into an existing table.

  • Mode: Replace – Replace the existing data in the table with the data being imported.

  • Partition Key Seed: Choose the long or timestamp column that you would like to use as the partitioning time column. If you do not specify a time column, the upload time of the transfer is used in conjunction with the addition of a time column.

  • Data Storage Timezone: Data Storage Timezone – Timezone in which the data is stored; data is also displayed in this timezone.


Data Transfer Frequency (When)

In this phase, you can choose to run the transfer only one time or schedule it to run at a specified frequency.

  • When

    • Once now: Run the transfer only once.

    • Repeat…

      • Schedule: accepts these three options: @hourly, @daily and @monthly and custom cron.

      • Delay Transfer: add a delay of execution time.

    • Time Zone: supports extended timezone formats like ‘Asia/Tokyo’.


After selecting the frequency, select Start Transfer to begin the transfer. If there are no errors, the transfer into Treasure Data will complete and the data will be available.

My Input Transfers

If you need to review the transfer you have just completed for other data connector jobs, you can view a list of your transfers in the My Input Transfers section.



Use the CLI to Configure the Connector

You can also use the MongoDB data connector from the command line interface. The following instructions show you how to import data using the CLI.

Install ‘td’ Command v0.11.9 or Later

Install the newest TD Toolbelt.

$ td --version
0.11.10


Create Seed Config File (seed.yml)

First, prepare seed.yml as shown, with your MongoDB details. Create seed.yml with the following content.

in:
  type: mongodb
  hosts:
    - {host: <HOST>, port: <PORT>}
  auth_method: auto
  #auth_source: <AUTH_SOURCE_DB>
  user: <USER>
  password: <PASSWORD>
  database: <DATABASE>
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  query: '{}'
  sort: '{}'
  stop_on_invalid_record: true
out:
  mode: append
  exec: {}

The Data Connector for MongoDB imports all documents that are stored in a specified collection. You may filter fields, specify queries, or sort with the following options.


3.2.1. Projection Option

A JSON document used for projection on query results. Fields in a document are used only if they match with this condition.

projection: '{ "_id": 1, "user_id": 1, "company": 1 }'


Query Option

A JSON document used for querying on the source collection. Documents are loaded from the collection if they match with this condition.

query: '{ user_id: { $gte: 20000 } }'


Sort Option

Order of result

sort: '{ "field1": 1, "field2": -1}' # field1 ascending, field2 descending

This option can't be used with aggregation option.

For more details on available out modes, see Appendix.


Aggregation Option

Aggregation query

aggregation: '{ $match: { field1: { $gt: 1}} }' # where field1 is greater than 1

This option can't be used with sort option.

For more details on available out modes, see Appendix.


Guess Fields (Generate load.yml)

The Data Connector MongoDB loads MongoDB’s documents as a single column and therefore doesn’t support connector:guess. Edit all settings in your load.yml.

You can preview how the system parses the documents by using the preview command.

$ td connector:preview load.yml
+---------------------------------------------------------------------------------------------------------------------+
| record:json                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------+
| "{\"user_id\":11200,\"company\":\"AA Inc.\",\"customer\":\"David\",\"created_at\":\"2015-03-31T06:12:37.000Z\"}"    |
| "{\"user_id\":20313,\"company\":\"BB Imc.\",\"customer\":\"Tom\",\"created_at\":\"2015-04-01T01:00:07.000Z\"}"      |
| "{\"user_id\":32132,\"company\":\"CC Inc.\",\"customer\":\"Fernando\",\"created_at\":\"2015-04-01T10:33:41.000Z\"}" |
| "{\"user_id\":40133,\"company\":\"DD Inc.\",\"customer\":\"Cesar\",\"created_at\":\"2015-04-02T05:12:32.000Z\"}"    |
| "{\"user_id\":93133,\"company\":\"EE Inc.\",\"customer\":\"Jake\",\"created_at\":\"2015-04-02T14:11:13.000Z\"}"     |
+---------------------------------------------------------------------------------------------------------------------+

The data connector supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.

You also must create a local database and table prior to executing the data load job.

$ td database:create td_sample_db
$ td table:create td_sample_db td_sample_table


Execute Load Job

Finally, submit the load job. It may take a couple of hours depending on the size of the data. Specify the Treasure Data database and table where the data should be stored.

Specify --time-column option, because Treasure Data’s storage is partitioned by time (see architecture). If the option is not provided, the Data Connector will choose the first long or timestamp column as the partitioning time. The type of the column specified by --time-column must be either of long or timestamp type.

If your data doesn’t have a time column you can add it using add_time filter option. For more details see add_time filter plugin.

If you want to expand the JSON column, you may add it using the expand_json filter option. More details at expand_json filter plugin

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at

The connector:issue command assumes that you have already created a database(td_sample_db) and a table(td_sample_table). If the database or the table do not exist in TD, the connector:issue command will fail, so create the database and table manually or use the --auto-create-table option with the td connector:issue command to auto-create the database and table:

$ td connector:issue load.yml --database td_sample_db --table td_sample_table --time-column created_at --auto-create-table

The Data Connector does not sort records on server-side. To use time-based partitioning effectively, sort records beforehand.

If you have a field called time, you don’t have to specify the --time-column option.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table


Incremental Load

You can load records incrementally by specifying a field in your table that contains date information by utilizing the incremental_field and last_record options.

in:
  type: mongodb
  hosts:
    - {host: <HOST>, port: <PORT>}
  user: <USER>
  password: <PASSWORD>
  database: <DATABASE>
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  incremental_field:
    - "field1"
  last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}}
  stop_on_invalid_record: true
out:
  mode: append
  exec: {}

The connector automatically creates the query and sort values.

query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }}' # field1 > "2015-01-25T13:23:15.000Z"
sort '{"field1", 1}' # field1 ascending


Incremental Load with Multiple Fields

You can also specify multiple fields for incremental_fields.

::yaml
incremental_field:
  - "field1"
  - "field2"
last_record: {"field1": {"$date": "2015-01-25T13:23:15.000Z"}, "field2": 13215}

The connector creates query and sort values using ‘AND’ condition.

query '{ field1: { $gt: {"$date": "2015-01-25T13:23:15.000Z"} }, field2: { $gt: 13215}}' # field1 > "2015-01-25T13:23:15.000Z" AND field2 > 13215
sort '{"field1", 1, "field2", 1}' # field1 ascending, field2 ascending

The `sort` option can't be used when you specify `incremental_field`.
The `aggregation` option can't be used when you specify `incremental_field`.

You must specify `last_record` with special characters when the field type is ObjectId or DateTime.

# ObjectId field
in:
  type: mongodb
  incremental_field:
    - "_id"
  last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}

# DateTime field
in:
  type: mongodb
  incremental_field:
    - "time_field"
  last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}


Scheduled Execution

You can schedule periodic data connector executions for MongoDB data imports. We configure our scheduler carefully to ensure high availability. By using this feature, you no longer need a cron daemon on your local data center.


Create the Schedule

A new schedule can be created using the td connector:create command. The following are required: the name of the schedule, the cron-style schedule, the database and table where the data will be stored, and the Data Connector configuration file.

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml

It’s also recommended to specify the --time-column option since Treasure Data’s storage is partitioned by time (see data partitioning).

$ td connector:create \
    daily_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at

The `cron` parameter also accepts three special options: `@hourly`, `@daily` and `@monthly`.

By default, schedule is setup in UTC timezone. You can set the schedule in a timezone using -t or --timezone option. Note that `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.


List the Schedules

You can see the list of currently scheduled entries by running the command td connector:list.

$ td connector:list
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| Name         | Cron         | Timezone | Delay | Database     | Table           | Config                                     |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+
| daily_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"in"=>{"type"=>"mongodb", "collection"... |
+--------------+--------------+----------+-------+--------------+-----------------+--------------------------------------------+


Show the Setting and Schedule History

td connector:show shows the execution setting of a scheduled entry.

% td connector:show daily_import
Name     : daily_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table
Config
---
in:
  type: mongodb
  collection: <COLLECTION>
  projection: '{"_id": 0}'
  filters:
    type: expand_json
    ...

td connector:history shows the execution history of a scheduled entry. To investigate the results of each individual run, use td job <jobid>.

% td connector:history daily_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set


Delete the Schedule

td connector:delete removes the schedule.

$ td connector:delete daily_import


Appendix

Modes for Out Plugin

You can specify data import mode in out section of seed.yml.

append (default)

This is the default mode and records are appended to the target table.

in:
  ...
out:
  mode: append

replace (In td 0.11.10 and later)

This mode replaces data in the target table. Note that any manual schema changes made to the target table remain intact with this mode.

in:
  ...
out:
  mode: replace
  • No labels