Skip to content
Last updated

TD Workflowのベストプラクティス

Treasure Data Workflowsでデータを管理するためのベストプラクティスをご紹介します。

中間データテーブルの使用

中間テーブルの使用は、クエリパフォーマンスの問題を解決する最良の方法です。Treasure Data上のPrestoクエリの85%以上は、日々増加するデータに対して発行されるスケジュールクエリです。中間テーブルを作成するインクリメンタルクエリを使用することで、データの分析を加速できます。生データの重い処理の場合、毎回すべての生データを処理する代わりに、新しく追加されたデータのみを中間テーブルに処理し、その結果を既存のテーブルに追加する方が効率的です。

シナリオ:国別の月間ユニークユーザー数を取得する

  1. 既存データを処理するための基盤となる中間テーブルを作成します。

  2. 前回のクエリ実行以降に追加された新しいデータを処理するクエリを作成してスケジュールします。結果を中間テーブルに追加します。

例:サイト訪問者のIPアドレスを毎日国名に変換し、中間テーブルに追加します。

  1. 中間テーブルにクエリを発行してデータを集計します。

例:中間テーブルから月間ユーザー数を集計します。

サンプルファイル

initial_task.dig

データベースを作成します。

_export:
  td:
    database: db_name

+initial_load:
  td>: queries?initial.sql
  create_table: Pageviews_intermediate
  engine: hive

daily_task.dig

スクリプトを毎日実行して、前回スクリプトが実行されてから新しいデータで中間テーブルを更新し、集計テーブルに追加します。

timezone: UTC

_export:
  td:
    database: db_name

schedule:
  daily>: 01:00:00

+update_intermediate_table:
  td>: queries/intermediate.sql
  insert_into: pageviews_intermediate
  enginse: hive

+analytics
  td>: queries/analytics.sql
  engine: hive

シーケンシャル処理の使用

多くのレコードを持つテーブルをスキャンする場合、すべてのデータセットにクエリを発行すると時間とリソースを消費し、リソースの制限を超えるとクエリが失敗する可能性があります。クエリを小さな単位に分割することで、リソース消費を削減できます。Treasure Workflowは、より小さな単位に分割されたクエリの処理に便利です。

シナリオ:月間アクセス数のユーザーランキングを作成する

  1. 集計テーブルを削除します。

  2. 日次トップ10ユーザーを集計テーブルに追加します。月の日数に応じてクエリを実行します。

  3. 集計テーブルから月間トップ10ユーザーのランキングを取得します。クエリを実行します。

timezone: UTC

schedule:
  monthly>: 1,01:00:00

 _export:
  td:
    database: db_name
  month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
  days: ${moment(session_date).add(-1, 'days).format("DD")}

+clear_table:
  td_ddl>:
  empty_tables: ["aggregate_table"]

+looping:
  loop>: ${days}
  _do:
    +ranking_of_the_day:
      td>: /queries/ranking_of_the_day.sql
      insert_into: aggregate_table

+create_table:
  td_ddl>:
  create_table: ["ranking_of_the_month"]

+ranking_of_the_month:
  td>: queries/ranking_of_the_month.sql
  insert_tab;e: ranking_of_the_month

エラー処理のためのカスタムメール通知の使用

デフォルトでは、エラー通知はワークフローオーナーにメールで送信されます。このシナリオを使用して、ワークフローエラー通知をカスタマイズできます。

シナリオ:ワークフローオーナー以外の人にエラー通知を送信する

  1. 不正なクエリを使用してワークフローを失敗させます。

  2. _errorタスクでメール本文、タイトル、受信者を設定します。

  3. ワークフローを実行し、エラー通知メールが受信されるかどうかを確認します。

custom_notifications.dig

timezone: UTC schedule:

_export:
  td:
    database: sample_datasets
    engine: presto

_error:
  mail>: body.txt
  subject: "[ERROR] Workflow failed!"
  to: [me@example.com]

month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
days: ${moment(session_date).add(-1, 'days).format("DD")}

body.txt

Hi {to},

The workflow session ${session_id} failed due to the following error:

${error.message}

データの検証

異なるクエリの結果を比較してデータを検証できます。

シナリオ:異なるクエリの結果を比較する

クエリ結果を比較するには、タスクを分割する必要があります。

  1. store_last_resultsパラメータを使用してクエリを実行します。

  2. 保存されたクエリ結果にfor_each>:オペレーターを使用します。

  3. ワークフローを実行し、エラー通知が受信されるかどうかを確認します。

store_last_resultsパラメータの場合、クエリ結果の最初の行を${td.results}変数に保存できます。ただし、後のtd>:オペレーターがstore_last_resultsパラメータを使用する場合、この変数は使用できません。

サンプルファイル

_export:
  td:
    database: db_name

+step1:
  td>: queries/step1.sql
  store_last_results: true
  engine: presto

+group:
  for_each>:
    result1: ["${td.last_results.cnt}"]

  _do:
    +step2:
      td>: queries/step2.sql
      store_last_results: true
      engine: presto

    +step3:
      if>: ${results1 == td.last.results.cnt}
      _do:
        echo>: Same count!
  subject: "[ERROR] Workflow failed!"
  to: [me@example.com]