Create Salesforce Connection Using the Command Line

Install the TD Toolbelt

Open a terminal and run the following command to install the newest TD Toolbelt.

$ td --version
0.15.3

In Salesforce

Go to Setup > Apps > App Manager and select New Connected App. (The procedure may vary depending on what version of Salesforce you are running.)

Select Manage Connected Apps and get your Consumer Key and Consumer Secret, which you will need for the next step.

For secure account access, if you don’t have one already, you should get a Salesforce Security Token. To do this, go to Account > Settings > Reset My Security Token and select Reset Security Token. You’ll receive your Security Token by email.

Create a Seed Config File (seed.yml)

Using a text editor, create a file called seed.yml. Copy and paste the following information, replacing the placeholder text with your Salesforce credentials. This configuration dumps the Account object specified in the target field because the replace mode is specified. For more details on available out modes, see the Appendix.

in:
  type: sfdc
  username: "USERNAME"                     # your username
  password: "PASSWORD"                     # your password
  security_token: "SECURITY_TOKEN"         # your Security Token
  client_id: "CLIENT_ID"                   # your app's consumer key
  client_secret: "CLIENT_SECRET"           # your app's consumer secret
  login_url: login.salesforce.com         # test.salesforce.com for sandbox
  incremental: false                       # 'full dump', see Scheduled execution below
  target: Account                          # Salesforce Object you want to import
out:
  mode: replace

All Salesforce Objects are supported for the target option. Some common objects include:

  • Opportunity

  • Contact

  • Lead

  • Account

  • Event

  • Task

For more information about Salesforce Objects, refer to the full reference at the Salesforce Developer portal.

The objects, fields, and records accessible to you depend on your Salesforce license, security configuration, and API access configuration in your Salesforce organization. Check your configurations if you see authorization errors in the next step.

Run the Guess Fields Command (generate load.yml)

Run the following command in your terminal:

$ td connector:guess seed.yml -o load.yml

Connector:guess automatically reads the target data and intelligently guesses the data format.

Open the file load.yml, where you’ll see guessed file format definitions including, in some cases, file formats, encodings, column names, and types.

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  login_url: login.salesforce.com
  ...
filters:
  ...
out:
  mode: replace

You can preview how the system will parse the file by using preview command.

$ td connector:preview load.yml

If the system detects the wrong column name or type, modify load.yml and preview again.

The Integration supports parsing of “boolean”, “long”, “double”, “string”, and “timestamp” types.

Execute the Load Job

In your terminal, submit the load job as shown in the following example. Processing might take a couple of hours depending on the data size. Users need to specify the database and table where their data is stored.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
                              --time-column createddate

The preceding command assumes you have already created database(td_sample_db) and table(td_sample_table). If the database or the table does not exist in TD the command will not succeed. So create the database and table manually or use the --auto-create-table option with td connector:issue command, as shown as follows, to auto-create the database and table.

$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
                              --time-column createddate --auto-create-table

You can assign the Time Format column to the “Partitioning Key” by using the “—time-column” option.

If you want to ingest records that have been deleted in SFDC, specify include_deleted_records: true. By default, ingested records don’t include records that you have previously deleted in Salesforce.

Here’s an example of a load file using the ‘include_deleted_records’ option

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  login_url: login.salesforce.com
  include_deleted_records: true # Ingest records that have been deleted
  ...
filters:
  ...
out:
  mode: replace

Scheduled Execution Using the Command Line

You can schedule periodic Data Connector execution for periodic SFDC import. We manage our scheduler carefully to ensure high availability. By using this feature, you no longer need a cron daemon on your local data center.

For the scheduled import, Data Connector for SFDC imports all Objects that match the specified target.

Scheduled execution supports additional configuration parameters that control the behavior of the Data Connector during its periodic attempts to fetch data from SFDC:

  • incremental This configuration is used to control the load mode, which governs how the Data Connector fetches data from SFDC based on one of the native timestamp fields associated with each Object.

    • incremental: true (default)
      In this mode, the data connector fetches only records of the specified Salesforce Object type that have been updated since the previous run of the connector. This mode is useful when the user wants to fetch just the Object targets that have changed since the previous scheduled run. This mode is typically combined with writing data into the destination table using ‘append’ mode.

      • incremental_columns (required)
        This option is required for incremental mode, to load necessary data only from SFDC. Make sure these columns are selected if you use a custom SOQL.

    • incremental: false
      In this mode, the data connector fetches all the records of the specified Salesforce Object type, regardless of when they were last updated. This mode is best combined with writing data into a destination table using ‘replace’ mode.

  • soql This configuration allows using a custom SOQL (Salesforce Object Query Language) to query and filter data. With SOQL, you can retrieve the data from a single object or multiple objects that are related to each other. You can pick the columns you want and filter or sort the output with your own conditional statement. As a limitation, our Data Connector doesn’t support SOQL syntax such as count() FROM Object.

  • where This configuration allows to use the filtering condition on the custom SOQL

  • columns This configuration is used to define a custom schema for data to be imported into Treasure Data. You can define only columns that you are interested in here but make sure they exist in the object that you are fetching. Otherwise, these columns aren’t available in the result.

  • last_record This configuration is used to control the last record from the previous load job. It requires the object include a key for the column name and a value for the column’s value. The key needs to match the SFDC (SOQL) column name.

Here’s an example of a seed file using incremental mode combined with ‘append’ mode for output.

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  login_url: login.salesforce.com
  target: Account  use_rest: false
  incremental: true
  incremental_columns:
    - LastModifiedDate
out:
  mode: append

The preceding configuration has a column name LastModifiedDate which is used to load records incrementally. The Data Connector will use a SOQL (SFDC Query Language) with that configuration as shown in the following.

-- first execution:
SELECT Id, Name, ... FROM Account ORDER BY LastModifiedDate

-- second execution:
SELECT Id, Name, ... FROM Account WHERE (LastModifiedDate > 2015-10-21T10:33:07+0000) ORDER BY LastModifiedDate
-- '2015-10-21T10:33:07+0000' is the latest modified record time from first execution

-- third execution:
SELECT Id, Name, ... FROM Account WHERE (LastModifiedDate > 2015-10-30T22:30:41+0000) ORDER BY LastModifiedDate
-- '2015-10-30T22:30:41+0000' is the latest modified record time from second execution

You can see that the WHERE clause was updated on every execution to load modified records only.

Here’s an example of a seed file using non-incremental mode (incremental: false) combined with ‘replace’ mode for output.

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  login_url: login.salesforce.com
  incremental: false
  target: Account
out:
  mode: replace

With the preceding configuration, the Data Connector always loads all records and replaces all existing records with loaded records.

The following is an example of a seed file using incremental mode and last_record combined with ‘append’ mode for output.

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  login_url: login.salesforce.com
  target: Account
  incremental: true
  incremental_columns:
    - LastModifiedDate
  last_record:
    - {"key": "LastModifiedDate", "value":  "2017-01-01T00:00:00Z"}
out:
  mode: append

In the preceding configuration, the Integration loads all records since 2017-01-01 00:00:00.

Here’s an example of a seed file using incremental mode combined with custom SOQL, where condition and columns schema for output.

in:
  type: sfdc
  username: "USERNAME"
  password: "PASSWORD"
  security_token: "SECURITY_TOKEN"
  client_id: "CLIENT_ID"
  client_secret: "CLIENT_SECRET"
  _url: .salesforce.com
  target: Account
  incremental: true
  incremental_columns:
    - LastModifiedDate
  soql: "select Id, Name, Description, LastModifiedDate from Account"  where: "ownership = 'Public'"
  columns:
    - name: Id
      type: string
    - name: Name
      type: string
    - name: Desc
      type: string
    - name: LastModifiedDate
      type: string
out:
  mode: append

As you can see, the SOQL can select fewer columns with restricted filter on ‘ownership’ field. The schema also defines an unknown field name or not exist in the SOQL (i.e. Desc) and this column won’t be available in the importing result.

Create the Schedule

A new schedule can be created by using the td connector:create command. The name of the schedule, cron-style schedule, database and table where their data will be stored, and the Data Connector configuration file are required.

$ td connector:create \
    daily_sfdc_import \
    "10 0 * * *"      \
    td_sample_db      \
    td_sample_table   \
    load.yml

The cron parameter also accepts three special options: @hourly, @daily and @monthly.

By default, the schedule is set in the UTC time zone. You can set the schedule in a time zone using the `-t` or `--timezone` option. The `--timezone` option supports only extended timezone formats like 'Asia/Tokyo', 'America/Los_Angeles' etc. Timezone abbreviations like PST, CST are *not* supported and may lead to unexpected schedules.

List the Schedules

You can see the list of scheduled entries by entering the command td connector:list.

$ td connector:list
+-------------------+--------------+----------+-------+--------------+-----------------+------------------------+
| Name              | Cron         | Timezone | Delay | Database     | Table           | Config                 |
+-------------------+--------------+----------+-------+--------------+-----------------+------------------------+
| daily_sfdc_import | 10 0 * * *   | UTC      | 0     | td_sample_db | td_sample_table | {"type"=>"sfdc", ... } |
+-------------------+--------------+----------+-------+--------------+-----------------+------------------------+

Show the Setting and History of Schedules

td connector:show shows the execution setting of a schedule entry.

% td connector:show daily_sfdc_import
Name     : daily_sfdc_import
Cron     : 10 0 * * *
Timezone : UTC
Delay    : 0
Database : td_sample_db
Table    : td_sample_table

td connector:history shows the execution history of a schedule entry. To investigate the results of each individual execution, use td job <jobid>.

% td connector:history daily_sfdc_import
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| JobID  | Status  | Records | Database     | Table           | Priority | Started                   | Duration |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
| 578066 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-18 00:10:05 +0000 | 160      |
| 577968 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-17 00:10:07 +0000 | 161      |
| 577914 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-16 00:10:03 +0000 | 152      |
| 577872 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-15 00:10:04 +0000 | 163      |
| 577810 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-14 00:10:04 +0000 | 164      |
| 577766 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-13 00:10:04 +0000 | 155      |
| 577710 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-12 00:10:05 +0000 | 156      |
| 577610 | success | 10000   | td_sample_db | td_sample_table | 0        | 2015-04-11 00:10:04 +0000 | 157      |
+--------+---------+---------+--------------+-----------------+----------+---------------------------+----------+
8 rows in set

Delete the Schedule

td connector:delete removes the schedule.

$ td connector:delete daily_sfdc_import

Appendix

Modes for the Out Plugin

You can specify file import mode in the out section of the load.yml file.

The out: section controls how data is imported into a Treasure Data table.
For example, you may choose to append data or replace data in an existing table in Treasure Data.

Mode

Description

Examples

Append

Records are appended to the target table.

in:
  ...
out:
  mode: append

Always Replace

Replaces data in the target table. Any manual schema changes made to the target table remain intact.

in:
  ...
out:
  mode: replace

Replace on new data

Replaces data in the target table only when there is new data to import.

in:
  ...
out:
  mode: replace_on_new_data

Use Synchronous Transfer Only

There are 2 options to ingest records from Salesforce to Treasure Data:

  • Bulk API

  • REST API

Bulk API offers a faster ingestion but also has a limitation of 10.000 batch allocations within a 24 hour period. If your target is large, your entire available batch allocation might be consumed, causing your job to fail eventually.

You might encounter the batch allocation limitation during the first ingestion of your data, when all of records are ingested.

If that ingestion of all records is one of your use cases, consider enabling the synchronous transfer only option and using REST API. The use of REST API avoids the batch allocation limitation but might be slower.

  • No labels