Page tree
Skip to end of metadata
Go to start of metadata

Use this page as a reference for the AWS operators that you can use in Treasure Workflow.

TD Workflow is based on the Treasure Data created OSS project Digdag. The original source of Digdag information can be found at docs.digdag.io. Most, but not all, Digdag operators can be used as part of Treasure Workflow.


Workflow Operator

Description

s3_wait>:

Wait for a file in Amazon S3

redshift>:

Redshift operations

redshift_load>:

Redshift load operations

redshift_unload>:

Redshift unload operations

emr>:

Amazon Elastic Map Reduce (Experimental)


s3_wait>:

s3_wait> operator waits for file to appear in Amazon S3.

+wait:
  s3_wait>: my-bucket/my-key

Secrets

  • aws.s3.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when accessing S3.

  • aws.s3.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when accessing S3.

  • aws.s3.region, aws.region

    An optional explicit AWS Region in which to access S3.

  • aws.s3.endpoint

    An optional explicit API endpoint to use when accessing S3. This overrides the region secret.

  • aws.s3.sse_c_key

    An optional Customer-Provided Server-Side Encryption (SSE-C) key to use when accessing S3. Must be Base64 encoded.

  • aws.s3.sse_c_key_algorithm

    An optional Customer-Provided Server-Side Encryption (SSE-C) key algorithm to use when accessing S3.

  • aws.s3.sse_c_key_md5

    An optional MD5 digest of the Customer-Provided Server-Side Encryption (SSE-C) key to use when accessing S3. Must be Base64 encoded.

For more information about SSE-C, See the AWS S3 Documentation.

Options

  • s3_wait>: BUCKET/KEY

    The operator waits for file(object) to appear in Amazon S3. 

    Examples:

    s3_wait>: my-bucket/my-data.gz

    s3_wait>: my-bucket/file/in/a/directory/my-datacsv.gz


    You must specify the object. If you specify the directory without the object as shown in the following, s3_wait operator does not work as expected and the wait will not end.

    s3_wait>: my-bucket/file/in/a/directory #s3_wait


  • region: REGION

    An optional explicit AWS Region in which to access S3. This may also be specified using the aws.s3.region secret.

  • endpoint: ENDPOINT

    An optional explicit AWS Region in which to access S3. This may also be specified using the aws.s3.endpoint secret. Note: This will override the region parameter.

  • bucket: BUCKET

    The S3 bucket where the file is located. Can be used together with the key parameter instead of putting the path on the operator line.

  • key: KEY

    The S3 key of the file. Can be used together with the bucket parameter instead of putting the path on the operator line.

  • version_id: VERSION_ID

    An optional object version to check for.

  • path_style_access: true/false

    An optional flag to control whether to use path-style or virtual hosted-style access when accessing S3. Note: Enabling path_style_access also requires specifying a region.

  • timeout: set timeout

    Example: wait 120 seconds timeout: 120s

  • continue_on_timeout: true/false If continue_on_timeout is set to true, the task will finish successfully on timeout.
    In the example below, the s3.last_object is empty, but an empty check is required in the following tasks if the s3.last_object exists.

    +task1:
        s3_wait>: bucket/object
        timeout: 60s
        continue_on_timeout: true
     +task2:
       if>: ${s3.last_object}
       _do:
        echo>: "No timeout"

Output Parameters

  • s3.last_object

    Information about the detected file.

      {
        "metadata": {
          "Accept-Ranges": "bytes",
          "Access-Control-Allow-Origin": "*",
          "Content-Length": 4711,
          "Content-Type": "application/octet-stream",
          "ETag": "5eb63bbbe01eeed093cb22bb8f5acdc3",
          "Last-Modified": 1474360744000,
          "Last-Ranges": "bytes"
        },
        "user_metadata": {
          "foo": "bar",
          "baz": "quux"
        }
      }
    
The s3_wait> operator makes use of polling with exponential backoff. As such there might be some time interval between a file being created and the s3_wait> operator detecting it. The intervals start at 5 seconds, increase to 10 seconds, then 20 seconds up to a maximum interval of 5 minutes.

redshift>:

redshift> operator runs queries and/or DDLs on Redshift.

_export: redshift:
  host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
  # port: 5439
  database: production_db
  user: app_user
  ssl: true
  schema: myschema
  # strict_transaction: false
+replace_deduplicated_master_table: 
  redshift>: queries/dedup_master_table.sql 
  create_table: dedup_master

+prepare_summary_table: 
    redshift>: queries/create_summary_table_ddl.sql

+insert_to_summary_table: 
    redshift>: queries/join_log_with_master.sql 
    insert_into: summary_table ```

+select_members: 
    redshift>: select_members.sql 
    store_last_results: first
    
+send_email: 
  for_each>:
  member: ${redshift.last_results}
  _do:
  mail>: body.txt
  subject: Hello, ${member.name}!
  to: [${member.email}]

Secrets

  • aws.redshift.password: NAME

    Optional user password to use when connecting to the Redshift database.

Options

  • redshift>: FILE.sql

    Path of the query template file. This file can contain ${...} syntax to embed variables.

    Examples:

    redshift>: queries/complex_queries.sql

  • create_table: NAME

    Table name to create from the results. This option deletes the table if it already exists.

    This option adds DROP TABLE IF EXISTS; CREATE TABLE AS before the statements written in the query template file. Also, CREATE TABLE statement can be written in the query template file itself without this command.

    Examples:

    create_table: dest_table

  • insert_into: NAME

    Table name to append results into.

    This option adds INSERT INTO before the statements written in the query template file. Also, INSERT INTO statement can be written in the query template file itself without this command.

    Examples:

    insert_into: dest_table

  • download_file: NAME

    Local CSV file name to be downloaded. The file includes the result of query.

    Examples:

    download_file: output.csv

  • store_last_results: BOOLEAN

    Whether to store the query results to redshift.last_results parameter. Default: false.

    Setting first stores the first row to the parameter as an object (e.g. ${redshift.last_results.count}).

    Setting all stores all rows to the parameter as an array of objects (e.g. ${redshift.last_results[0].name}). If number of rows exceeds limit, task fails.

    Examples:

    store_last_results: first

    store_last_results: all

  • database: NAME

    Database name.

    Examples:

    database: my_db

  • host: NAME

    Hostname or IP address of the database.

    Examples:

    host: db.foobar.com

  • port: NUMBER

    Port number to connect to the database. Default: 5439.

    Examples:

    port: 2345

  • user: NAME

    User to connect to the database

    Examples:

    user: app_user

  • ssl: BOOLEAN

    Enable SSL to connect to the database. Default: false.

    Examples:

    ssl: true

  • schema: NAME

    Default schema name. Default: public.

    Examples:

    schema: my_schema

  • strict_transaction: BOOLEAN

    Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

    Examples:

    strict_transaction: false

  • status_table_schema: NAME

    Schema name of status table. Default: same as the value of schema option.

    Examples:

    status_table_schema: writable_schema

  • status_table: NAME

    Table name prefix of status table. Default: __digdag_status.

    Examples:

    status_table: customized_status_table

  • connect_timeout: NAME

    The timeout value used for socket connect operations. If connecting to the server takes longer than this value, the connection is broken. Default: 30.

    Examples:

    connect_timeout: 30

  • socket_timeout: NAME

    The timeout value used for socket read operations. If reading from the server takes longer than this value, the connection is closed. Default: 1800.

    Examples:

    socket_timeout: 1800

redshift_load>:

redshift_load> operator runs COPY statement to load data from external storage on Redshift.

_export:
  redshift:
    host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
    # port: 5439
    database: production_db
    user: app_user
    ssl: true
    # strict_transaction: false

+load_from_dynamodb_simple:
    redshift_load>:
    schema: myschema
    table: transactions
    from: dynamodb://transaction-table
    readratio: 123

+load_from_s3_with_many_options:
    redshift_load>:
    schema: myschema
    table: access_logs
    from: s3://my-app-bucket/access_logs/today
    manifest: true
    encrypted: true
    region: us-east-1
    csv: "'"
    delimiter: "$"
    # json: s3://my-app-bucket/access_logs/jsonpathfile
    # avro: auto
    # fixedwidth: host:15,code:3,method:15
    gzip: true
    # bzip2: true
    # lzop: true
    acceptanydate: true
    acceptinvchars: "&"
    blanksasnull: true
    dateformat: yyyy-MM-dd
    emptyasnull: true
    encoding: UTF8
    escape: false
    explicit_ids: true
    fillrecord: true
    ignoreblanklines: true
    ignoreheader: 2
    null_as: nULl
    removequotes: false
    roundec: true
    timeformat: YYYY-MM-DD HH:MI:SS
    trimblanks: true
    truncatecolumns: true
    comprows: 12
    compupdate: ON
    maxerror: 34
    # noload: true
    statupdate: false
    role_session_name: federated_user
    session_duration: 1800
    # temp_credentials: false

Secrets

  • aws.redshift.password: NAME

    Optional user password to use when connecting to the Redshift database.

  • aws.redshift_load.access_key_id, aws.redshift.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.

  • aws.redshift_load.secret_access_key, aws.redshift.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.

  • aws.redshift_load.role_arn, aws.redshift.role_arn, aws.role_arn

    Optional Amazon resource names (ARNs) used to copy data to the Redshift. The role needs AssumeRole role to use this option. Requires temp_credentials to be true. If this option isn’t specified, this operator tries to use a federated user

Options

  • database: NAME

    Database name.

    Examples:

    database: my_db

  • host: NAME

    Hostname or IP address of the database.

    Examples:

    host: db.foobar.com

  • port: NUMBER

    Port number to connect to the database. Default: 5439.

    Examples:

    port: 2345

  • user: NAME

    User to connect to the database

    Examples:

    user: app_user

  • ssl: BOOLEAN

    Enable SSL to connect to the database. Default: false.

    Examples:

    ssl: true

  • schema: NAME

    Default schema name. Default: public.

    Examples:

    schema: my_schema

  • strict_transaction: BOOLEAN

    Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

    Examples:

    strict_transaction: false

  • status_table_schema: NAME

    Schema name of status table. Default: same as the value of schema option.

    Examples:

    status_table_schema: writable_schema

  • status_table: NAME

    Table name prefix of status table. Default: __digdag_status.

    Examples:

    status_table: customized_status_table

  • table: NAME

    Table name in Redshift database to be loaded data

    Examples:

    table: access_logs

  • from: URI

    Parameter mapped to FROM parameter of Redshift’s COPY statement

    Examples:

    from: s3://my-app-bucket/access_logs/today

  • manifest: BOOLEAN

    Parameter mapped to MANIFEST parameter of Redshift’s COPY statement

    Examples:

    manifest: true

  • encrypted: BOOLEAN

    Parameter mapped to ENCRYPTED parameter of Redshift’s COPY statement

    Examples:

    encrypted: true

  • readratio: NUMBER

    Parameter mapped to READRATIO parameter of Redshift’s COPY statement

    Examples:

    readratio: 150

  • region: NAME

    Parameter mapped to REGION parameter of Redshift’s COPY statement

    Examples:

    region: us-east-1

  • csv: CHARACTER

    Parameter mapped to CSV parameter of Redshift’s COPY statement. If you want to just use default quote character of CSV parameter, set empty string like csv: ''

    Examples:

    csv: "'"

  • delimiter: CHARACTER

    Parameter mapped to DELIMITER parameter of Redshift’s COPY statement

    Examples:

    delimiter: "$"

  • json: URI

    Parameter mapped to JSON parameter of Redshift’s COPY statement

    Examples:

    json: auto

    Examples:

    json: s3://my-app-bucket/access_logs/jsonpathfile

  • avro: URI

    Parameter mapped to AVRO parameter of Redshift’s COPY statement

    Examples:

    avro: auto

    avro: s3://my-app-bucket/access_logs/jsonpathfile

  • fixedwidth: CSV

    Parameter mapped to FIXEDWIDTH parameter of Redshift’s COPY statement

    Examples:

    fixedwidth: host:15,code:3,method:15

  • gzip: BOOLEAN

    Parameter mapped to GZIP parameter of Redshift’s COPY statement

    Examples:

    gzip: true

  • bzip2: BOOLEAN

    Parameter mapped to BZIP2 parameter of Redshift’s COPY statement

    Examples:

    bzip2: true

  • lzop: BOOLEAN

    Parameter mapped to LZOP parameter of Redshift’s COPY statement

    Examples:

    lzop: true

  • acceptanydate: BOOLEAN

    Parameter mapped to ACCEPTANYDATE parameter of Redshift’s COPY statement

    Examples:

    acceptanydate: true

  • acceptinvchars: CHARACTER

    Parameter mapped to ACCEPTINVCHARS parameter of Redshift’s COPY statement

    Examples:

    acceptinvchars: "&"

  • blanksasnull: BOOLEAN

    Parameter mapped to BLANKSASNULL parameter of Redshift’s COPY statement

    Examples:

    blanksasnull: true

  • dateformat: STRING

    Parameter mapped to DATEFORMAT parameter of Redshift’s COPY statement

    Examples:

    dateformat: yyyy-MM-dd

  • emptyasnull: BOOLEAN

    Parameter mapped to EMPTYASNULL parameter of Redshift’s COPY statement

    Examples:

    emptyasnull: true

  • encoding: TYPE

    Parameter mapped to ENCODING parameter of Redshift’s COPY statement

    Examples:

    encoding: UTF8

  • escape: BOOLEAN

    Parameter mapped to ESCAPE parameter of Redshift’s COPY statement

    Examples:

    escape: false

  • explicit_ids: BOOLEAN

    Parameter mapped to EXPLICIT_IDS parameter of Redshift’s COPY statement

    Examples:

    explicit_ids: true

  • fillrecord: BOOLEAN

    Parameter mapped to FILLRECORD parameter of Redshift’s COPY statement

    Examples:

    fillrecord: true

  • ignoreblanklines: BOOLEAN

    Parameter mapped to IGNOREBLANKLINES parameter of Redshift’s COPY statement

    Examples:

    ignoreblanklines: true

  • ignoreheader: NUMBER

    Parameter mapped to IGNOREHEADER parameter of Redshift’s COPY statement

    Examples:

    ignoreheader: 2

  • null_as: STRING

    Parameter mapped to NULL AS parameter of Redshift’s COPY statement

    Examples:

    null_as: nULl

  • removequotes: BOOLEAN

    Parameter mapped to REMOVEQUOTES parameter of Redshift’s COPY statement

    Examples:

    removequotes: false

  • roundec: BOOLEAN

    Parameter mapped to ROUNDEC parameter of Redshift’s COPY statement

    Examples:

    roundec: true

  • timeformat: STRING

    Parameter mapped to TIMEFORMAT parameter of Redshift’s COPY statement

    Examples:

    timeformat: YYYY-MM-DD HH:MI:SS

  • trimblanks: BOOLEAN

    Parameter mapped to TRIMBLANKS parameter of Redshift’s COPY statement

    Examples:

    trimblanks: true

  • truncatecolumns: BOOLEAN

    Parameter mapped to TRUNCATECOLUMNS parameter of Redshift’s COPY statement

    Examples:

    truncatecolumns: true

  • comprows: NUMBER

    Parameter mapped to COMPROWS parameter of Redshift’s COPY statement

    Examples:

    comprows: 12

  • compupdate: TYPE

    Parameter mapped to COMPUPDATE parameter of Redshift’s COPY statement

    Examples:

    compupdate: ON

  • maxerror: NUMBER

    Parameter mapped to MAXERROR parameter of Redshift’s COPY statement

    Examples:

    maxerror: 34

  • noload: BOOLEAN

    Parameter mapped to NOLOAD parameter of Redshift’s COPY statement

    Examples:

    noload: true

  • statupdate: TYPE

    Parameter mapped to STATUPDATE parameter of Redshift’s COPY statement

    Examples:

    statupdate: off

  • temp_credentials: BOOLEAN

    Whether this operator uses temporary security credentials. Default: true. This operator tries to use temporary security credentials as follows:

    • If role_arn is specified, it calls AssumeRole action

    • If not, it calls GetFederationToken action

    See details about AssumeRole and GetFederationToken in the documents of AWS Security Token Service.

    So either of AssumeRole or GetFederationToken action is called to use temporary security credentials by default for secure operation. But if this option is disabled, this operator uses credentials as-is set in the secrets insread of temporary security credentials.

    Examples:

    temp_credentials: false

  • session_duration INTEGER

    Session duration of temporary security credentials. Default: 3 hour. This option isn’t used when disabling temp_credentials

    Examples:

    session_duration: 1800

redshift_unload>:

redshift_unload> operator runs UNLOAD statement to export data to external storage on Redshift.

_export:
  redshift:
    host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
    # port: 5439
    database: production_db
    user: app_user
    ssl: true
    schema: myschema
    # strict_transaction: false

+load_from_s3_with_many_options:
    redshift_unload>:
    query: select * from access_logs
    to: s3://my-app-bucket/access_logs/today
    manifest: true
    encrypted: true
    delimiter: "$"
    # fixedwidth: host:15,code:3,method:15
    gzip: true
    # bzip2: true
    null_as: nULl
    escape: false
    addquotes: true
    parallel: ON

Secrets

  • aws.redshift.password: NAME

    Optional user password to use when connecting to the Redshift database.

  • aws.redshift_unload.access_key_id, aws.redshift.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.

  • aws.redshift_unload.secret_access_key, aws.redshift.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.

  • aws.redshift_load.role_arn, aws.redshift.role_arn, aws.role_arn

    Optional Amazon resource names (ARNs) used to copy data to the Redshift. The role needs AssumeRole role to use this option. Requires temp_credentials to be true. If this option isn’t specified, this operator tries to use a federated user

Options

  • database: NAME

    Database name.

    Examples:

    database: my_db

  • host: NAME

    Hostname or IP address of the database.

    Examples:

    host: db.foobar.com

  • port: NUMBER

    Port number to connect to the database. Default: 5439.

    Examples:

    port: 2345

  • user: NAME

    User to connect to the database

    Examples:

    user: app_user

  • ssl: BOOLEAN

    Enable SSL to connect to the database. Default: false.

    Examples:

    ssl: true

  • schema: NAME

    Default schema name. Default: public.

    Examples:

    schema: my_schema

  • strict_transaction: BOOLEAN

    Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

    Examples:

    strict_transaction: false

  • status_table_schema: NAME

    Schema name of status table. Default: same as the value of schema option.

    Examples:

    status_table_schema: writable_schema

  • status_table: NAME

    Table name prefix of status table. Default: __digdag_status.

    Examples:

    status_table: customized_status_table

  • query: STRING

    SELECT query. The results of the query are unloaded.

    Examples:

    query: select * from access_logs

  • to: URI

    Parameter mapped to TO parameter of Redshift’s UNLOAD statement

    Examples:

    to: s3://my-app-bucket/access_logs/today

  • manifest: BOOLEAN

    Parameter mapped to MANIFEST parameter of Redshift’s UNLOAD statement

    Examples:

    manifest: true

  • encrypted: BOOLEAN

    Parameter mapped to ENCRYPTED parameter of Redshift’s UNLOAD statement

    Examples:

    encrypted: true

  • allowoverwrite: BOOLEAN

    Parameter mapped to ALLOWOVERWRITE parameter of Redshift’s UNLOAD statement

    Examples:

    allowoverwrite: true

  • delimiter: CHARACTER

    Parameter mapped to DELIMITER parameter of Redshift’s UNLOAD statement

    Examples:

    delimiter: "$"

  • fixedwidth: BOOLEAN

    Parameter mapped to FIXEDWIDTH parameter of Redshift’s UNLOAD statement

    Examples:

    fixedwidth: host:15,code:3,method:15

  • gzip: BOOLEAN

    Parameter mapped to GZIP parameter of Redshift’s UNLOAD statement

    Examples:

    gzip: true

  • bzip2: BOOLEAN

    Parameter mapped to BZIP2 parameter of Redshift’s UNLOAD statement

    Examples:

    bzip2: true

  • null_as: BOOLEAN

    Parameter mapped to NULL_AS parameter of Redshift’s UNLOAD statement

    Examples:

    null_as: nuLL

  • escape: BOOLEAN

    Parameter mapped to ESCAPE parameter of Redshift’s UNLOAD statement

    Examples:

    escape: true

  • addquotes: BOOLEAN

    Parameter mapped to ADDQUOTES parameter of Redshift’s UNLOAD statement

    Examples:

    addquotes: true

  • parallel: TYPE

    Parameter mapped to PARALLEL parameter of Redshift’s UNLOAD statement

    Examples:

    parallel: ON

  • temp_credentials: BOOLEAN

    Whether this operator uses temporary security credentials. Default: true. This operator tries to use temporary security credentials as follows:

    • If role_arn is specified, it calls AssumeRole action

    • If not, it calls GetFederationToken action

    See details about AssumeRole and GetFederationToken in the documents of AWS Security Token Service.

    So either of AssumeRole or GetFederationToken action is called to use temporary security credentials by default for secure operation. But if this option is disabled, this operator uses credentials as-is set in the secrets insread of temporary security credentials.

    Examples:

    temp_credentials: false

  • session_duration INTEGER

    Session duration of temporary security credentials. Default: 3 hour. This option isn’t used when disabling temp_credentials

    Examples:

    session_duration: 1800

emr>:

emr> operator can be used to run EMR jobs, create clusters and submit steps to existing clusters.

This operator is experimental. If we deprecate it, we will likely start supporting a service to run scripts as a service.

Due to the nature of this operator, our support team cannot assist with the debugging of issues on Elastic Map Reduce service itself. Contact Amazon Web Services for help using EMR.

For detailed information about EMR, see the Amazon Elastic MapReduce Documentation.

+emr_job:
  emr>:
  cluster:
    name: my-cluster
    ec2:
      key: my-ec2-key
      master:
        type: m3.2xlarge
      core:
        type: m3.xlarge
        count: 10
    logs: s3://my-bucket/logs/
  staging: s3://my-bucket/staging/
  steps:
    - type: spark
      application: pi.py
    - type: spark-sql
      query: queries/query.sql
      result: s3://my-bucket/results/${session_uuid}/
    - type: script
      script: scripts/hello.sh
      args: [hello, world]

Secrets

  • aws.emr.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when submitting EMR jobs.

  • aws.emr.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when submitting EMR jobs.

  • aws.emr.role_arn, aws.role_arn

    The AWS Role to assume when submitting EMR jobs.

  • aws.emr.region, aws.region

    The AWS region to use for EMR service.

  • aws.emr.endpoint

    The AWS EMR endpoint address to use.

  • aws.s3.region, aws.region

    The AWS region to use for S3 service to store staging files.

  • aws.s3.endpoint

    The AWS S3 endpoint address to use for staging files.

  • aws.kms.region, aws.region

    The AWS region to use for KMS service to encrypt variables passed to EMR jobs.

  • aws.kms.endpoint

    The AWS KMS endpoint address to use for EMR variable encryption.

Options

  • cluster: STRING | OBJECT

    Specifies either the ID of an existing cluster to submit steps to or the configuration of a new cluster to create.

    Using an existing cluster:

    cluster: j-7KHU3VCWGNAFL
    

    Creating a new minimal ephemeral cluster with just one node:

    cluster:
      ec2:
        key: my-ec2-key
      logs: s3://my-bucket/logs/
    

    Creating a customized cluster with several hosts:

    cluster:
      name: my-cluster
      auto_terminate: false
      release: emr-5.2.0
      applications:
        - hadoop
        - spark
        - hue
        - zookeeper
      ec2:
        key: my-ec2-key
        subnet_id: subnet-83047402b
        master:
          type: m4.2xlarge
        core:
          type: m4.xlarge
          count: 10
          ebs:
            optimized: true
            devices:
              volume_specifiation:
                iops: 10000
                size_in_gb: 1000
                type: gp2
              volumes_per_instance: 6
        task:
          - type: c4.4xlarge
            count: 20
          - type: g2.2xlarge
            count: 6
      logs: s3://my-bucket/logs/
      bootstrap:
        - install_foo.sh
        - name: Install Bar
          path: install_bar.sh
          args: [baz, quux]
    
  • staging: S3_URI

    A S3 folder to use for staging local files for execution on the EMR cluster. Note: the configured AWS credentials must have permission to put and get objects in this folder.

    Examples:

    staging: s3://my-bucket/staging/

  • emr.region

    The AWS region to use for EMR service.

  • emr.endpoint

    The AWS EMR endpoint address to use.

  • s3.region

    The AWS region to use for S3 service to store staging files.

  • s3.endpoint

    The AWS S3 endpoint address to use for staging files.

  • kms.region

    The AWS region to use for KMS service to encrypt variables passed to EMR jobs.

  • kms.endpoint

    The AWS KMS endpoint address to use for EMR variable encryption.

  • steps: LIST

    A list of steps to submit to the EMR cluster.

    steps:

    - type: flink
      application: flink/WordCount.jar
    
    - type: hive
      script: queries/hive-query.q
      vars:
        INPUT: s3://my-bucket/data/
        OUTPUT: s3://my-bucket/output/
      hiveconf:
        hive.support.sql11.reserved.keywords: false
    
    - type: spark
      application: spark/pi.scala
    
    - type: spark
      application: s3://my-bucket/spark/hello.py
      args: [foo, bar]
    
    - type: spark
      application: spark/hello.jar
      class: com.example.Hello
      jars:
        - libhello.jar
        - s3://td-spark/td-spark-assembly-0.1.jar
      conf:
        spark.locality.wait: 5s
        spark.memory.fraction: 0.5
      args: [foo, bar]
    
    - type: spark-sql
      query: spark/query.sql
      result: s3://my-bucket/results/${session_uuid}/
    
    - type: script
      script: s3://my-bucket/scripts/hello.sh
      args: [hello, world]
    
    - type: script
      script: scripts/hello.sh
      args: [world]
    
    - type: command
      command: echo
      args: [hello, world]
    
  • action_on_failure: TERMINATE_JOB_FLOW | TERMINATE_CLUSTER | CANCEL_AND_WAIT | CONTINUE

    The action EMR should take in response to a job step failing.

Output Parameters

  • emr.last_cluster_id

    The ID of the cluster created. If a pre-existing cluster was used, this parameter will not be set.

  • No labels