Many batch input connectors in Treasure Data CDP support incremental data loading. This feature is useful in cases where:
- Data sources are too large to fully re-import regularly.
- Frequent imports of updated data are required to keep information fresh.
- The number of rows ingested should be minimized to optimize usage.
To enable incremental loading, the data source must have specific columns (e.g., created_date) that help identify new records. The method of enabling incremental load differs based on whether you are using the Integration Hub or Treasure Workflow.
When using source setting in Integration Hub, the incremental load mechanism is managed by Treasure Data. The process works as follows:
- Users specify one or more incremental columns (e.g.,
id,created_at). - The input connector is scheduled to run periodically.
- Treasure Data automatically records the latest values of the incremental columns in a special section called
Config Diff. - On each scheduled execution, the connector fetches only new records based on these recorded values.
Since Treasure Data maintains and updates last_records automatically, users do not need to manually configure parameters for each execution.
This approach is useful when you simply want to fetch just the object targets that have changed since the previously scheduled run.
For example, in the UI:

Database integrations, such as MySQL, BigQuery, and SQL server, require column or field names to load incremental data. For example:

When configuring incremental loading through Treasure Workflow, the process requires explicit management by the user. Unlike the UI-based approach, Treasure Workflow does not automatically track incremental values. Instead:
- Users define their own incremental logic using workflow variables such as
session_time or other workflow parameters. - This
session_time value is injected into the connector configuration file. - Each time the workflow runs, it uses the specified session time to determine which data to import.
- If a workflow session fails, the same
session_time can be reused to ensure data consistency.
This approach is designed for greater flexibility, allowing users to handle scenarios such as backfill operations where historical data needs to be reloaded.
In this approach, the incremental-loading feature of the input connector is not used. Instead, the workflow variables are injected into the connector configuration, which allows it to load different parts of the data in each workflow session.
Create a workflow definition that declares the use of a workflow variable. The workflow provides various variables that could be useful for incremental loading or any other purposes.

Inject the
last_session_timevariable into the input connector configuration file.
Whenever the input connector is triggered by the workflow, the appropriate last session time will be injected into the connector configuration file. When there are retries of a failed workflow session, the variable value will remain unchanged.
By understanding the differences between these approaches, users can choose the best method based on their specific data ingestion needs.
If you decide to use incremental loading for your RDB/DWH, you should determine if you need to create an index on the columns to avoid a full table scan. For example, the following index could be created:
CREATE INDEX embulk_incremental_loading_index ON TABLE (updated_at, id);In some cases, the recommendation is to leave incremental_columns unset and let the integration automatically find an AUTO_INCREMENT primary key.
If incremental loading is not chosen, the integration fetches all the records of the specified integration object type, regardless of when they were last updated. This mode is best combined with writing data into a destination table using the ‘replace’ mode.
When using incremental loadingfor some integrations the following behavior occurs:
If incremental_columns: [updated_at, id] option is set, the query is as follows:
SELECT *FROM (...original query IS here...)ORDER BY updated_at, idWhen bulk data loading finishes successfully, it outputs last_record: parameter as config-diff so that the next execution uses it.
At the next execution, when last_record: is also set, and generates an additional WHERE clause to load records larger than the last record. For example, if last_record: ["2017-01-01T00:32:12.000000", 5291] is set:
SELECT *FROM (...original query IS here...)WHERE updated_at > '2017-01-01T00:32:12.000000' OR (updated_at = '2017-01-01T00:32:12.000000' AND id > 5291) ORDER BY updated_at, idThen, it updates last_record: so that the next execution uses the updated last_record.
For other integrations, the following behavior occurs:
SELECT ...FROM `target_table`WHERE ((`col1` > ?))The bind variable (? literal in the query above) includes the maximum value of the previous execution. Larger values are recognized as the difference as to the target column (col1).
Treasure Data supports incremental loading for Data Extensions that have a date field.
If incremental loading is selected, the integration loads records according to the range specified by the from_date and the fetch_days for the specified date field.
The following list provides details about all available options:
- target — Target object. For example, a Customer or Invoice that accepts a search operation. (string, required)
- email — Email address. (string, required)
- password — Password. (string, required)
- account — Account ID. (string, required)
- role — Role ID. (string, default: nil)
- sandbox — Use sandbox environment if true. (bool, default:false)
- from_datetime — Fetch data after this time. (string, default: nil)
For example, you set up an incremental job with
Start Time = 2020-11-01T01:43:47.900Z and End Time = 2020-12-01T01:43:47.900Z
After the first run, the next start time will be calculated base on the precision setting.
- HOURLY next start time will be 2020-12-01T02:43:47.900Z
- DAILY next start time will be 2020-12-02T01:43:47.900Z
- MONTHLY next start time will be 2021-01-01T01:43:47.900Z
And the End Time will be the current execution time.