Here are some best practices for managing your data with Treasure Data Workflows.

Using Intermediate Data Tables


Using an intermediate table is the best way to solve query performance problems. More than 85% of the Presto queries on Treasure Data are scheduled queries issued on data that increases daily. By using incremental queries that create intermediate tables, you can accelerate the analysis of data. In the case of heavy processing of raw data, it is more efficient to only process newly added data to an intermediate table and append those results to an existing table instead of processing all raw data every time.

Scenario: Get the number of monthly unique users by country

1. Create an intermediate table to serve as the basis for processing existing data.
2. Create and schedule a query to process new data added since the last query was run. Append the results to an intermediate table.

Example: Change IP addresses of site visitors to country names daily and append to an intermediate table.

3. Issue a query to the intermediate table to aggregate data.

Example: Aggregate the number of monthly users from the intermediate table.

Sample files

initial_task.dig

Create a database.

_export:
  td:
    database: db_name

+initial_load:
  td>: queries?initial.sql
  create_table: Pageviews_intermediate
  engine: hive

daily_task.dig

Run script daily to update the intermediate table with data that is new since the last time the script was run and then add it to the aggregate table.

timezone: UTC

_export:
  td:
    database: db_name

schedule:
  daily>: 01:00:00

+update_intermediate_table:
  td>: queries/intermediate.sql
  insert_into: pageviews_intermediate
  enginse: hive

+analytics
  td>: queries/analytics.sql
  engine: hive 


Using Sequential Processing

When scanning a table with many records, issuing a query on all data sets consumes time and resources and can potentially cause the query to fail if it exceeds the limit of the resources. By dividing queries into small units, you can reduce response consumption. Treasure Workflow is convenient for processing queries divided into smaller units.

Scenario: Create user ranking for the number of monthly accesses

1. Delete aggregate table.
2. Add daily top 10 users to the aggregate table; run query according to the number of days in the month.
3. Get ranking of monthly top 10 users from the aggregate table; run query.


Sample file

timezone: UTC

schedule:
  monthly>: 1,01:00:00

 _export:
  td:
    database: db_name
  month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
  days: ${moment(session_date).add(-1, 'days).format("DD")}

+clear_table:
  td_ddl>:
  empty_tables: ["aggregate_table"]

+looping:
  loop>: ${days}
  _do:
    +ranking_of_the_day:
      td>: /queries/ranking_of_the_day.sql
      insert_into: aggregate_table 
  
+create_table:
  td_ddl>:
  create_table: ["ranking_of_the_month"]

+ranking_of_the_month:
  td>: queries/ranking_of_the_month.sql
  insert_tab;e: ranking_of_the_month


Using Custom Email Notification for Error Handling

By default, error notifications are sent by email to the workflow owner. Using this scenario, you can customize the workflow error notifications.

Scenario: Send error notifications to someone other than the workflow owner

1. Use an incorrect query to cause the workflow to fail.
2. Set the email body, title, and recipient with the _error task.
3. Run the workflow and check to see if the error notification email is received.

Sample files

custom_notifications.dig

timezone: UTC schedule: 

_export:
  td:
    database: sample_datasets
    engine: presto

_error:
  mail>: body.txt
  subject: "[ERROR] Workflow failed!"
  to: [me@example.com]

body.txt

month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
days: ${moment(session_date).add(-1, 'days).format("DD")}

body.txt

Hi {to},

The workflow session ${session_id} failed due to the following error:

${error.message}

Validating Your Data

You can compare the results of different queries in order to validate the data. 

Scenario: Compare results of different queries

To compare query results, you need to divide the tasks.

1. Run a query with store_last_results parameter. 
2. Use for_each>:operator for saved query results.
3. Run the workflow and check to see if the error notification is received.

For the store_last_results parameter, the first line of the query results can be stored in the ${td.results} variable. However, if a later td>:operator uses the store_last_results parameter, this variable cannot be used.

Sample File

_export:
  td:
    database: db_name

+step1:
  td>: queries/step1.sql
  store_last_results: true
  engine: presto

+group:
  for_each>: 
    result1: ["${td.last_results.cnt}"]
   
  _do:
    +step2:
      td>: queries/step2.sql
      store_last_results: true
      engine: presto

    +step3:
      if>: ${results1 == td.last.results.cnt}
      _do:
        echo>: Same count!    
  subject: "[ERROR] Workflow failed!"
  to: [me@example.com]


  • No labels