Page tree
Skip to end of metadata
Go to start of metadata

This is a reference for the primary operators that you can use in Treasure Workflow.

TD Workflow is based on Digdag. Most, but not all, Digdag operators can be used as part of Treasure Workflow.

Workflow Operator

Description

td>:

Run a Treasure Data query

td_ddl>:

Treasure Data operations

td_for_each>:

Repeat using Treasure Data queries

td_load>:

Treasure Data bulk loading

td_partial_delete>:

Delete range of Treasure Data table

td_result_export>:

Exports results to a specific designation

td_run>:

Runs a Treasure Data saved query

td_table_export>:

Treasure Data table export to S3

td_wait>:

Waits for data arriving at Treasure Data table

td_wait_table>:

Waits for data arriving at Treasure Data table

td>:

td> operator runs a Hive or Presto query on Treasure Data.

_export:
  td:
    database: www_access

+simple_query:
  td>: queries/simple_query.sql

+create_new_table_using_result_of_select:
  td>: queries/select_sql.sql
  create_table: mytable_${session_date_compact}

+insert_result_of_select_into_a_table:
  td>: queries/select_sql.sql
  insert_into: mytable

+result_with_connection:
  td>: queries/select_sql.sql
  result_connection: connection_created_on_console

+result_with_connection_with_settings:
  td>: queries/select_sql.sql
  result_connection: my_s3_connection
  result_settings:
    bucket: my_bucket
    path: /logs/

Examples

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when running Treasure Data queries.

Options

  • td>: FILE.sql

    Path to a query template file. This file can contain ${...} syntax to embed variables.

    Examples:

    td>: queries/step1.sql
  • create_table: NAME

    Name of a table to create from the results. This option deletes the table if it already exists.

    This option adds DROP TABLE IF EXISTS; CREATE TABLE AS (Presto) or INSERT OVERWRITE (Hive) commands before the SELECT statement. If the query includes a -- DIGDAG_INSERT_LINEline, the commands are inserted there.

    Examples:

    create_table: my_table

    Note: Depending on the number of rows to be inserted or partitions to be created, a CREATE TABLE operation may take several hours. An operation might time out and the job fail if too many partitions are affected.

    If the job fails and the output contains the following error message, then you are inserting too many records in one operation. Try the INSERT INTO operation inserting fewer rows in a single step.

    org.plazmadb.metadb.MetadataSQLException: ERROR: canceling statement due to statement timeout 
  • insert_into: NAME

    Name of a table to append results into. The table is created if it does not already exist.

    This option adds INSERT INTO (Presto) or INSERT INTO TABLE (Hive) command at the beginning of SELECT statement. If the query includes -- DIGDAG_INSERT_LINE line, the command is inserted to the line.

    Examples:

    insert_into: my_table

    Note: Depending on the number of rows to be inserted or partitions to be created, an INSERT INTO operation may take several hours. An operation might time out and the job fail if too many partitions are affected.

    If the job fails and the output contains the following message, then you are inserting too many records in one operation. Try the INSERT INTO operation inserting fewer rows in a single step.

    org.plazmadb.metadb.MetadataSQLException: ERROR: canceling statement due to statement timeout 
  • download_file: NAME

    Saves query result as a local CSV file.

    Examples:

    download_file: output.csv
  • store_last_results: BOOLEAN

    Stores the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

    Examples:

    store_last_results: true
  • preview: BOOLEAN

    Tries to show some query results to confirm the results of a query.
    Examples:

    preview: true
  • result_url: NAME
    Output the query results to the URL:
    Examples:

    result_url: tableau://username:password@my.tableauserver.com/?mode=replace
  • database: NAME
    Name of a database.
    Examples:

    database: my_db
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

  • engine: presto

    Query engine (Presto or Hive).

    Examples:

    engine: hive
    engine: presto
  • priority: 0

    Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

  • result_connection: NAME

    Use a connection to write the query results to an external system.

    You can create a connection using the TD Console.

    Examples:

    result_connection: my_s3_connection
  • result_settings: MAP

    Add additional settings to the result connection.

    This option is valid only if result_connection option is set.

    Examples:

    result_connection: my_s3_connection result_settings: bucket: my_s3_bucket path: /logs/
    result_connection: my_http result_settings: path: /endpoint
  • presto_pool_name: NAME

    Name of a resource pool to run the query in. Applicable only when engine is Presto.

    Examples:

    presto_pool_name: poc
  • hive_pool_name: NAME

    Name of a resource pool to run the query in. Applicable only when engine is Hive.

    Examples:

    engine: hive hive_pool_name: poc

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074

  • td.last_results

    The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

    Examples:

    {"path":"/index.html","count":1}

td_ddl>:

td_ddl> operator runs an operational task on Treasure Data.

_export:
  td:
    database: www_access

+step1:
  td_ddl>:
  create_tables: ["my_table_${session_date_compact}"]
+step2:
  td_ddl>:
  drop_tables: ["my_table_${session_date_compact}"]
+step3:
  td_ddl>:
  empty_tables: ["my_table_${session_date_compact}"]
+step4:
  td_ddl>:
  rename_tables: [{from: "my_table_${session_date_compact}", to: "my_table"}]

To specify a database that is not declared with _export, add the database name under the options. For example:

_export:
  td:
    database: test_db1

+task1:
  td_ddl>:
  create_tables: [test_ddl1, test_ddl2]
  database: test_db2

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when performing Treasure Data operations.

Options

  • create_tables: [ARRAY OF NAMES]

    Create new tables if not already existing.

    Examples:

    create_tables: [my_table1, my_table2]
  • empty_tables: [ARRAY OF NAME]

    Create new tables (drop it first if it exists).

    Examples:

    empty_tables: [my_table1, my_table2]
  • drop_tables: [ARRAY OF NAMES]

    Drop tables if existing.

    Examples:

    drop_tables: [my_table1, my_table2]
  • rename_tables: [ARRAY OF {to:, from:}]

    Rename a table to another name (override the destination table if it already exists).

    Examples:

    rename_tables: [{from: my_table1, to: my_table2}]
  • create_databases: [ARRAY OF NAMES]

    Create new databases if not already existing.

    Examples:

    create_databases: [my_database1, my_database2]
  • empty_databases: [ARRAY OF NAMES]

    Create new databases (drop it first if it exists).

    Examples:

    empty_databases: [my_database1, my_database2]
  • drop_databases: [ARRAY OF NAMES]

    Drop databases if existing.

    Examples:

    drop_databases: [my_database1, my_database2]
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

td_for_each>:

td_for_each> operator loops subtasks for each result row of a Hive or Presto query on Treasure Data.

Subtasks set at _do section can reference results using ${td.each.COLUMN_NAME} syntax where COLUMN_NAME is a name of a column.

For example, if you run a query select email, name from users and the query returns 3 rows, this operator runs subtasks 3 times with ${td.each.email} and ${td.each.name}} parameters.

_export:
  td:
    database: www_access

+for_each_users:
  td_for_each>: queries/users.sql
  _do:
    +show:
      echo>: found a user ${td.each.name} email ${td.each.email}

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when running Treasure Data queries.

Options

  • td>: FILE.sql

    Path to a query template file. This file can contain ${...} syntax to embed variables.

    Examples:

    td>: queries/step1.sql
  • database: NAME

    Name of a database.

    Examples:

    database: my_db
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

  • engine: presto

    Query engine (Presto or Hive).

    Examples:

    engine: hive
    engine: presto
  • priority: 0

    Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

  • presto_pool_name: NAME

    Name of a resource pool to run the query in. Applicable only when engine is Presto.

    Examples:

    presto_pool_name: poc
  • hive_pool_name: NAME

    Name of a resource pool to run the query in. Applicable only when engine is Hive.

    Examples:

    engine: hive pool_name: poc

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074

td_load>:

td_load> operator loads data from storage, databases, or services.

You can only run the sources that belong to you in Treasure Data. To run sources owned by other Treasure Data users, you must have admin permission.

+step1:
  td_load>: config/guessed.yml
  database: prod
  table: raw

Examples

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when submitting Treasure Data bulk load jobs.

Options

  • td_load>: FILE.yml

    Path to a YAML template file. When using td command, you must use the “guessed” configuration. If you saved a Data Connector job, you can use the Unique ID instead of the YAML path.

    Examples:

    td_load>: imports/load.yml
  • database: NAME

    Name of the target database to house data.

    Examples:

    database: my_database
  • table: NAME

    Name of the target table to house data.

    Examples:

    table: my_table
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074

td_partial_delete>:

td_partial_delete> operator deletes records from a Treasure Data table.

Be aware that records imported using streaming import can’t be deleted for several hours using td_partial_delete. Records imported by INSERT INTO, data connector, and bulk imports can be deleted immediately.

Time range needs to be hourly. Setting non-zero values to minutes or seconds will be rejected.

+step1:
  td_partial_delete>: my_table
  database: mydb
  from: 2016-01-01T00:00:00+08:00
  to:   2016-02-01T00:00:00+08:00

Secrets

  • td.apikey: API_KEY The Treasure Data API key to use when running Treasure Data queries.

Parameters

  • td_partial_delete>: NAME

    Name of the table.

    Examples:

    td_partial_delete>: my_table
  • database: NAME

    Name of the database.

    Examples:

    database: my_database
  • from: yyyy-MM-ddTHH:mm:ss[Z]

    Delete records from this time (inclusive). Actual time range is :command:[from, to). Value should be a UNIX timestamp integer (seconds) or string in ISO-8601 (yyyy-MM-ddTHH:mm:ss[Z]) format.

    Examples:

    from: 2016-01-01T00:00:00+08:00
  • to: yyyy-MM-ddTHH:mm:ss[Z]

    Delete records to this time (exclusive). Actual time range is :command:[from, to). Value should be a UNIX timestamp integer (seconds) or string in ISO-8601 (yyyy-MM-ddTHH:mm:ss[Z]) format.

    Examples:

    to: 2016-02-01T00:00:00+08:00
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

td_result_export>:

td_result_export> operator exports a job result to an output destination.

_export:
  td:
    database: www_access

+simple_query:
  td>: queries/simple_query.sql

+export_query_result:
  td_result_export>:
  job_id: 12345
  result_connection: my_s3_connection
  result_settings:
    bucket: my_bucket
    path: /logs/

Options

  • job_id: NUMBER

The id of a job that is exported.

Examples:

job_id: 12345

You can also specify ${td.last_job_id} as the last executed job id.

job_id: ${td.last_job_id}
  • result_connection: NAME

    Use a connection to write the query results to an external system.

    You can create a connection using the web console.

    Examples:

    result_connection: my_s3_connection
    
  • result_settings: MAP

    Add additional settings to the result connection.

    Examples:

    result_connection: my_s3_connection
    result_settings:
      bucket: my_s3_bucket
      path: /logs/
    
    result_connection: my_http
    result_settings:
      path: /endpoint
    

    Use secrets to store all sensitive items (e.g. user, password, etc.) instead of writing down them in YAML files directly.

td_run>:

td_run> operator runs a query saved on Treasure Data.

_export:
  td:
    database: www_access

+step1:
  td_run>: 12345
+step2:
  td_run>: myquery2
  session_time: 2016-01-01T01:01:01+0000

Examples

Secrets

  • td.apikey: API_KEY

The Treasure Data API key to use when running Treasure Data queries.

Options

  • td_run>: SAVEDQUERY_ID_ or _SAVED_QUERY_NAME

    Run saved query. If number was specified, it is considered as an ID of a saved query. Otherwise it is considered as a name of a saved query.

    Examples:

    td_run>: 12345

    Examples:

    td_run>: my_query
  • download_file: NAME

    Save query result as a local CSV file.

    Examples:

    download_file: output.csv
  • store_last_results: BOOLEAN

    Store the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

    Examples:

    store_last_results: true
  • preview: BOOLEAN

    Try to show some query results to confirm the results of a query.

    Examples:

    preview: true
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074

  • td.last_results

    The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

    Examples:

    {"path":"/index.html","count":1}

td_table_export>:

Treasure Data limits export capability to only the us-east region S3 bucket. In general, use the Result Output to S3 feature using td operator. This workflow example ishttps://github.com/treasure-data/workflow-examples/tree/master/td/s3.

td_table_export> operator exports data from Treasure Data to S3.

+step1:
  td_table_export>:
  database: mydb
  table: mytable
  file_format: jsonl.gz
  from: 2016-01-01 00:00:00 +0800
  to:   2016-02-01 00:00:00 +0800
  s3_bucket: my_backup_backet
  s3_path_prefix: mydb/mytable

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when running Treasure Data table exports.

  • aws.s3.access_key_id: ACCESS_KEY_ID

    The AWS Access Key ID to use when writing to S3.

    Examples:

    aws.s3.access_key_id: ABCDEFGHJKLMNOPQRSTU
  • aws.s3.secret_access_key: SECRET_ACCESS_KEY

    The AWS Secret Access Key to use when writing to S3.

    Examples:

    aws.s3.secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3

Options

  • database: NAME

    Name of the database.

    Examples:

    database: my_database
  • table: NAME

    Name of the table to export.

    Examples:

    table: my_table
  • file_format: TYPE

    Output file format. Available formats are tsv.gz (tab-separated values per line) and jsonl.gz(json record per line).

    json.gz and line-json.gz are available only for backward compatibility purpose.

    Examples:

    file_format: jsonl.gz
  • from: yyyy-MM-dd HH:mm:ss[ Z]

    Export records from this time (inclusive). Actual time range is :command:[from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

    Examples:

    from: 2016-01-01 00:00:00 +0800
  • to: yyyy-MM-dd HH:mm:ss[ Z]

    Export records to this time (exclusive). Actual time range is :command:[from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

    Examples:

    to: 2016-02-01 00:00:00 +0800
  • s3_bucket: NAME

    Specific S3 bucket name to export records to.

    Examples:

    s3_bucket: my_backup_backet
  • s3_path_prefix: NAME

    S3 file name prefix.

    Examples:

    s3_path_prefix: mytable/mydb
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074

td_wait>:

td_wait> operator runs a query periodically until it returns true. This operator can use more complex queries compared to the tdwait_table> operator.

_export:
  td:
    database: www_access

+wait:
  td_wait>: queries/check_recent_record.sql

+step1:
  td>: queries/use_records.sql

Example queries:

select 1 from target_table where TD_TIME_RANGE(time, '${session_time}') limit 1

select count(*) > 1000 from target_table where TD_TIME_RANGE(time, '${last_session_time}')

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when running Treasure Data queries.

Options

  • td_wait>: FILE.sql

    Path to a query template file. This file can contain ${...} syntax to embed variables.

    Examples:

    td_wait>: queries/check_recent_record.sql
  • database: NAME

    Name of a database.

    Examples:

    database: my_db
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access to the endpoint (default: true).

  • engine: presto

    Query engine (Presto or Hive).

    Examples:

    engine: hive
    engine: presto
  • priority: 0

    Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

  • presto_pool_name: NAME

    Name of a resource pool to run the queries in. Applicable only when engine is Presto.

    Examples:

    presto_pool_name: poc
  • hive_pool_name: NAME

    Name of a resource pool to run the queries in. Applicable only when engine is Hive.

    Examples:

    engine: hive hive_pool_name: poc

Output parameters

  • td.last_job_id

    The job id this task executed.

    Examples:

    52036074


td_wait_table>:

td_wait_table> operator checks a table periodically until it has certain number of records in a configured range. This is useful to wait execution of following tasks until some records are imported to a table.

_export:
  td:
    database: www_access

+wait:
  td_wait_table>: target_table

+step1:
  td>: queries/use_records.sql

Secrets

  • td.apikey: API_KEY

    The Treasure Data API key to use when running Treasure Data queries.

Options

  • td_wait_table>: FILE.sql

    Name of a table.

    Examples:

    td_wait_table>: target_table
  • rows: N

    Number of rows to wait (default: 0).

    Examples:

    rows: 10
  • database: NAME

    Name of a database.

    Examples:

    database: my_db
  • endpoint: ADDRESS

    API endpoint (default: api.treasuredata.com).

  • use_ssl: BOOLEAN

    Enable SSL (https) to access the endpoint (default: true).

  • engine: PRESTO

    Query engine (Presto or Hive).

    Examples:

    engine: hive
    engine: presto
  • priority: 0

    Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

  • presto_pool_name: NAME

    Name of a target resource pool to run queries. Applicable only when engine is Presto.

    Examples:

    presto_pool_name: poc
  • hive_pool_name: NAME

    Name of a target resource pool to run queries. Applicable only when engine is Hive.

    Examples:

    engine: hive hive_pool_name: poc
  • No labels