Page tree
Skip to end of metadata
Go to start of metadata

Use this page as a reference for the Google cloud platform operators that you can use in Treasure Workflow.

TD Workflow is based on the Treasure Data created OSS project Digdag. Most, but not all Digdag operators can be used as part of Treasure Workflow.

Workflow Operator

Description

gcs_wait>:

Wait for a file in Google Cloud Storage

bq>:

Running Google BigQuery queries

bq_ddl>:

Managing Google BigQuery Datasets and Tables

bq_extract>:

Exporting Data from Google BigQuery

bq_load>:

Importing Data into Google BigQuery

gcs_wait>:

gcs_wait> operator can be used to wait for the file to appear in Google Cloud Storage.

+wait:
  gcs_wait>: my_bucket/some/file

+wait:
  gcs_wait>: gs://my_bucket/some/file

Secrets

Options

  • gcs_wait>: URI | BUCKET/OBJECT

    Google Cloud Storage URI or path of the file to wait for.

    Examples:

    gcs_wait>: my-bucket/my-directory/my-data.gz

    Examples:

    gcs_wait>: gs://my-bucket/my-directory/my-data.gz

  • bucket: NAME

    The GCS bucket where the file is located. Can be used together with the object parameter instead of putting the path on the operator command line.

  • object: PATH

    The GCS path of the file. Can be used together with the bucket parameter instead of putting the path on the operator command line.

Output Parameters

  • gcs_wait.last_object

    Information about the detected file.

      {
          "metadata": {
              "bucket": "my_bucket",
              "contentType": "text/plain",
              "crc32c": "yV/Pdw==",
              "etag": "CKjJ6/H4988CEAE=",
              "generation": 1477466841081000,
              "id": "my_bucket/some/file",
              "kind": "storage#object",
              "md5Hash": "IT4zYwc3D23HpSGe3nZ85A==",
              "mediaLink": "https://www.googleapis.com/download/storage/v1/b/my_bucket/o/some/file?generation=1477466841081000&alt=media",
              "metageneration": 1,
              "name": "some/file",
              "selfLink": "https://www.googleapis.com/storage/v1/b/my_bucket/o/some/file",
              "size": 4711,
              "storageClass": "STANDARD",
              "timeCreated": {
                  "value": 1477466841070,
                  "dateOnly": false,
                  "timeZoneShift": 0
              },
              "updated": {
                  "value": 1477466841070,
                  "dateOnly": false,
                  "timeZoneShift": 0
              }
          }
      }
    

Note: The gcs_wait> operator makes use of polling with exponential backoff. As such there might be some time interval between a file being created and the gcs_wait> operator detecting it.

bq>:

bq> operator runs a query on Google BigQuery.

The bq> operator uses standard SQL by default, whereas the default in the BigQuery console is legacy SQL. To run legacy SQL queries, set use_legacy_sql: true. For more information about standard SQL on BigQuery, see Migrating from legacy SQL.

_export:
  bq:
    dataset: my_dataset

+step1:
  bq>: queries/step1.sql
+step2:
  bq>: queries/step2.sql
  destination_table: result_table
+step3:
  bq>: queries/step3.sql
  destination_table: other_project:other_dataset.other_table

Secrets

  • gcp.credential: CREDENTIAL

    The Google Cloud Platform account credential private key to use, in JSON format.

    For information on how to generate a service account key, see the Google Cloud Platform Documentation.

    Upload the private key JSON file to Treasure Data using the secrets client command:

    td workflow secrets —project my_project —set gcp.credential=@my-svc-account-b4df00d.json

Options

  • bq>: query.sql

    Path to a query template file. This file can contain ${...} syntax to embed variables.

    Examples:

    bq>: queries/step1.sql
    
  • dataset: NAME

    Specifies the default dataset to use in the query and in the destination_table parameter.

    Examples:

    dataset: my_dataset
    
    dataset: other_project:other_dataset
    
  • destination_table: NAME

    Specifies a table to store the query results in.

    Examples:

    destination_table: my_result_table
    
    destination_table: some_dataset.some_table
    
    destination_table: some_project:some_dataset.some_table
    

    You can append a date as $YYYYMMDD form at the end of table name to store data in a specific partition. See Creating and Updating Date-Partitioned Tables document for details.

    destination_table: some_dataset.some_partitioned_table$20160101
    
  • create_disposition: CREATE_IF_NEEDED | CREATE_NEVER

    Specifies whether the destination table should be automatically created when executing the query.

    • CREATE_IF_NEEDED: (default) The destination table is created if it does not already exist.

    • CREATE_NEVER: The destination table must already exist, otherwise the query will fail.

    Examples:

    create_disposition: CREATE_IF_NEEDED
    
    create_disposition: CREATE_NEVER
    
  • write_disposition: WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY

    Specifies whether to permit writing of data to an already existing destination table.

    • WRITE_TRUNCATE: If the destination table already exists, any data in it will be overwritten.

    • WRITE_APPEND: If the destination table already exists, any data in it will be appended to.

    • WRITE_EMPTY: (default) The query fails if the destination table already exists and is not empty.

    Examples:

    write_disposition: WRITE_TRUNCATE
    
    write_disposition: WRITE_APPEND
    
    write_disposition: WRITE_EMPTY
    
  • priority: INTERACTIVE | BATCH

    Specifies the priority to use for this query. Default: INTERACTIVE.

  • use_query_cache: BOOLEAN

    Whether to use BigQuery query result caching. Default: true.

  • allow_large_results: BOOLEAN

    Whether to allow arbitrarily large result tables. Requires destination_table to be set and use_legacy_sql to be true.

  • flatten_results: BOOLEAN

    Whether to flatten nested and repeated fields in the query results. Default: true. Requires use_legacy_sql to be true.

  • use_legacy_sql: BOOLEAN

    Whether to use legacy BigQuery SQL. Default: false.

  • maximum_billing_tier: INTEGER

    Limit the billing tier for this query. Default: The project default.

  • table_definitions: OBJECT

    Describes external data sources that are accessed in the query. For more information see BigQuery documentation.

  • user_defined_function_resources: LIST

    Describes user-defined function resources used in the query. For more information see BigQuery documentation.

Output Parameters

  • bq.last_job_id

    The id of the BigQuery job that executed this query.

Limitation

This operator supports only US as location.

bq_ddl>:

bq_ddl> operator can be used to create, delete and clear Google BigQuery Datasets and Tables.

_export:
  bq:
    dataset: my_dataset

+prepare:
  bq_ddl>:
  create_datasets:
    - my_dataset_${session_date_compact}
  empty_datasets:
    - my_dataset_${session_date_compact}
  delete_datasets:
    - my_dataset_${last_session_date_compact}
  create_tables:
    - my_table_${session_date_compact}
  empty_tables:
    - my_table_${session_date_compact}
  delete_tables:
    - my_table_${last_session_date_compact}

Secrets

Options

  • create_datasets: LIST

    Create new datasets.

    For detailed information about dataset configuration parameters, see the Google BigQuery Datasets Documentation.

    Examples:


    create_datasets:

    • foo

    • other_project:bar


    create_datasets:

    • foo_dataset_${session_date_compact}

    • id: bar_dataset_${session_date_compact} project: other_project friendly_name: Bar dataset ${session_date_compact} description: Bar dataset for ${session_date} default_table_expiration: 7d location: EU labels: foo: bar quux: 17 access:

      • domain: example.com role: READER

      • userByEmail: ingest@example.com role: WRITER

      • groupByEmail: administrators@example.com role: OWNER

  • empty_datasets: LIST

    Create new datasets, deleting them first if they already exist. Any tables in the datasets will also be deleted.

    For detailed information about dataset configuration parameters, see the Google BigQuery Datasets Documentation.

    Examples:

    empty_datasets:

    • foo

    • other_project:bar

    empty_datasets:

    • foo_dataset_${session_date_compact}

    • id: bar_dataset_${session_date_compact} project: other_project friendly_name: Bar dataset ${session_date_compact} description: Bar dataset for ${session_date} default_table_expiration: 7d location: EU labels: foo: bar quux: 17 access:

      • domain: example.com role: READER

      • userByEmail: ingest@example.com role: WRITER

      • groupByEmail: administrators@example.com role: OWNER ```

  • delete_datasets: LIST

    Delete datasets, if they exist.

    Examples:

    delete_datasets:

    • foo

    • other_project:bar

    delete_datasets:

    • foo_dataset_${last_session_date_compact}

    • other_project:bar_dataset_${last_session_date_compact} ```

  • create_tables: LIST

    Create new tables.

    For detailed information about table configuration parameters, see the Google BigQuery Tables Documentation.

    Examples:

    create_tables:

    • foo

    • other_dataset.bar

    • other_project:yet_another_dataset.baz

    create_tables:

    • foo_dataset_${session_date_compact}

    • id: bar_dataset_${session_date_compact} project: other_project dataset: other_dataset friendly_name: Bar dataset ${session_date_compact} description: Bar dataset for ${session_date} expiration_time: 2016-11-01-T01:02:03Z schema: fields:

      - {name: foo, type: STRING}
      - {name: bar, type: INTEGER}
      

      labels: foo: bar quux: 17 access:

      • domain: example.com role: READER

      • userByEmail: ingest@example.com role: WRITER

      • groupByEmail: administrators@example.com role: OWNER 

  • empty_tables: LIST Create new tables, deleting them first if they already exist.

    For detailed information about table configuration parameters, see the Google BigQuery Tables Documentation.

    Examples:

    empty_tables:

    • foo

    • other_dataset.bar

    • other_project:yet_another_dataset.baz ```

    empty_tables:

    • foo_table_${session_date_compact}

    • id: bar_table_${session_date_compact} project: other_project dataset: other_dataset friendly_name: Bar dataset ${session_date_compact} description: Bar dataset for ${session_date} expiration_time: 2016-11-01-T01:02:03Z schema: fields:

      - {name: foo, type: STRING}
      - {name: bar, type: INTEGER}
      

      labels: foo: bar quux: 17 access:

      • domain: example.com role: READER

      • userByEmail: ingest@example.com role: WRITER

      • groupByEmail: administrators@example.com role: OWNER 

  • delete_tables: LIST Delete tables, if they exist.

    Examples:

    delete_tables:

    • foo

    • other_dataset.bar

    • other_project:yet_another_dataset.baz`

    delete_tables:

    • foo_table_${last_session_date_compact}

    • bar_table_${last_session_date_compact} 

bq_extract>:

bq_extract> operator can be used to export data from Google BigQuery tables.

_export:
  bq:
    dataset: my_dataset

+process:
  bq>: queries/analyze.sql
  destination_table: result

+export:
  bq_extract>: result
  destination: gs://my_bucket/result.csv.gz
  compression: GZIP

Secrets

Options

  • bq_extract>: TABLE A reference to the table that should be exported.

    Examples:

    bq_extract>: my_table

    bq_extract>: my_dataset.my_table

    bq_extract>: my_project:my_dataset.my_table

  • destination: URI | LIST A URI or list of URIs with the location of the destination export files. These must be Google Cloud Storage URIs.

    Examples:

    destination: gs://my_bucket/my_export.csv

    destination:

    • gs://my_bucket/my_export_1.csv

    • gs://my_bucket/my_export_2.csv 

  • print_header: BOOLEAN Whether to print out a header row in the results. Default: true.

  • field_delimiter: CHARACTER A delimiter to use between fields in the output. Default: ,.

    Examples:

    field_delimiter: '\\t'

  • destination_format: CSV | NEWLINE_DELIMITED_JSON | AVRO The format of the destination export file. Default: CSV.

    Examples:

    destination_format: CSV

    destination_format: NEWLINE_DELIMITED_JSON

    destination_format: AVRO

  • compression: GZIP | NONE The compression to use for the export file. Default: NONE.

    Examples:

    compression: NONE

    compression: GZIP

Output Parameters

  • bq.last_job_id

    The id of the BigQuery job that performed this export.

bq_load>:

bq_load> operator can be used to import data into Google BigQuery tables.

_export:
  bq:
    dataset: my_dataset

+ingest:
  bq_load>: gs://my_bucket/data.csv
  destination_table: my_data

+process:
  bq>: queries/process.sql
  destination_table: my_result

Secrets

Options

  • bq_load>: URI | LIST

    A URI or list of URIs identifying files in GCS to import.

    Examples:

    bq_load>: gs://my_bucket/data.csv

    bq_load>:

    • gs://my_bucket/data1.csv.gz

    • gs://my_bucket/data2_*.csv.gz 

  • dataset: NAME

    The dataset that the destination table is located in or should be created in. Can also be specified directly in the table reference.

    Examples:

    dataset: my_dataset

    dataset: my_project:my_dataset

  • destination_table: NAME

    The table to store the imported data in.

    Examples:

    destination_table: my_result_table

    destination_table: some_dataset.some_table

    destination_table: some_project:some_dataset.some_table

    You can append a date as $YYYYMMDD form at the end of table name to store data in a specific partition. See Creating and Updating Date-Partitioned Tables document for details.

    destination_table: some_dataset.some_partitioned_table$20160101

  • project: NAME

    The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter.

  • source_format: CSV | NEWLINE_DELIMITED_JSON | AVRO | DATASTORE_BACKUP

    The format of the files to be imported. Default: CSV.

    Examples:

    source_format: CSV

    source_format: NEWLINE_DELIMITED_JSON

    source_format: AVRO

    source_format: DATASTORE_BACKUP

  • field_delimiter: CHARACTER

    The separator used between fields in CSV files to be imported. Default: ,.

    Examples:

    field_delimiter: '\\t'

  • create_disposition: CREATE_IF_NEEDED | CREATE_NEVER

    Specifies whether the destination table should be automatically created when performing the import.

    • CREATE_IF_NEEDED: (default) The destination table is created if it does not already exist.

    • CREATE_NEVER: The destination table must already exist, otherwise the import will fail.

    Examples:

    create_disposition: CREATE_IF_NEEDED

    create_disposition: CREATE_NEVER

  • write_disposition: WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY

    Specifies whether to permit importing data to an already existing destination table.

    • WRITE_TRUNCATE: If the destination table already exists, any data in it will be overwritten.

    • WRITE_APPEND: If the destination table already exists, any data in it will be appended to.

    • WRITE_EMPTY: (default) The import fails if the destination table already exists and is not empty.

    Examples:

    write_disposition: WRITE_TRUNCATE

    write_disposition: WRITE_APPEND

    write_disposition: WRITE_EMPTY

  • skip_leading_rows: INTEGER

    The number of leading rows to skip in CSV files to import. Default: 0.

    Examples:

    skip_leading_rows: 1

  • encoding: UTF-8 | ISO-8859-1 The character encoding of the data in the files to import. Default: UTF-8.

    Examples:

    encoding: ISO-8859-1

  • quote: CHARACTER

    The character quote of the data in the files to import. Default: '"'.

    Examples:

    quote: ''

    quote: "'"

  • max_bad_records: INTEGER

    The maximum number of bad records to ignore before failing the import. Default: 0.

    Examples:

    max_bad_records: 100

  • allow_quoted_newlines: BOOLEAN

    Whether to allow quoted data sections that contain newline characters in a CSV file. Default: false.

  • allow_jagged_rows: BOOLEAN

    Whether to accept rows that are missing trailing optional columns in CSV files. Default: false.

  • ignore_unknown_values: BOOLEAN

    Whether to ignore extra values in data that are not represented in the table schema. Default: false.

  • projection_fields: LIST

    A list of names of Cloud Datastore entity properties to load. Requires source_format: DATASTORE_BACKUP.

  • autodetect: BOOLEAN

    Whether to automatically infer options and schema for CSV and JSON sources. Default: false.

  • schema_update_options: LIST

    A list of destination table schema updates that may be automatically performed when performing the import.

    schema_update_options:

    - ALLOW_FIELD_ADDITION
    - ALLOW_FIELD_RELAXATION
  • schema: OBJECT | STRING

    A table schema. It can accept object, json or yml file path.

    Example:

    You can write schema within .dag file directly.

    yml +step: bq_load>: gs:///path/to_file … schema:


    - name: "name",
      type: "string"
    ...

    Or you can write it as external file.

    json { "fields": [{"name": "name", "type": "STRING"},
    ...
    ] } yml fields:
    name: "name", type: "string" … 

And specify the file path. Supported formats are YAML and JSON. If an extension of the path is .json bq_load try parse as JSON, otherwise YAML.

yml 
+step: 
  bq_load>: 
  gs://<bucket>/path/to_file ... 
  schema: path/to/schema.json 
  # or # 
  schema: path/to/schema.yml


Output Parameters

  • bq.last_job_id

    The id of the BigQuery job that performed this import.


  • No labels