Treasure Data Workflowsでデータを管理するためのベストプラクティスをご紹介します。
中間テーブルの使用は、クエリパフォーマンスの問題を解決する最良の方法です。Treasure Data上のPrestoクエリの85%以上は、日々増加するデータに対して発行されるスケジュールクエリです。中間テーブルを作成するインクリメンタルクエリを使用することで、データの分析を加速できます。生データの重い処理の場合、毎回すべての生データを処理する代わりに、新しく追加されたデータのみを中間テーブルに処理し、その結果を既存のテーブルに追加する方が効率的です。
既存データを処理するための基盤となる中間テーブルを作成します。
前回のクエリ実行以降に追加された新しいデータを処理するクエリを作成してスケジュールします。結果を中間テーブルに追加します。
例:サイト訪問者のIPアドレスを毎日国名に変換し、中間テーブルに追加します。
- 中間テーブルにクエリを発行してデータを集計します。
例:中間テーブルから月間ユーザー数を集計します。
データベースを作成します。
_export:
td:
database: db_name
+initial_load:
td>: queries?initial.sql
create_table: Pageviews_intermediate
engine: hiveスクリプトを毎日実行して、前回スクリプトが実行されてから新しいデータで中間テーブルを更新し、集計テーブルに追加します。
timezone: UTC
_export:
td:
database: db_name
schedule:
daily>: 01:00:00
+update_intermediate_table:
td>: queries/intermediate.sql
insert_into: pageviews_intermediate
enginse: hive
+analytics
td>: queries/analytics.sql
engine: hive多くのレコードを持つテーブルをスキャンする場合、すべてのデータセットにクエリを発行すると時間とリソースを消費し、リソースの制限を超えるとクエリが失敗する可能性があります。クエリを小さな単位に分割することで、リソース消費を削減できます。Treasure Workflowは、より小さな単位に分割されたクエリの処理に便利です。
集計テーブルを削除します。
日次トップ10ユーザーを集計テーブルに追加します。月の日数に応じてクエリを実行します。
集計テーブルから月間トップ10ユーザーのランキングを取得します。クエリを実行します。
timezone: UTC
schedule:
monthly>: 1,01:00:00
_export:
td:
database: db_name
month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
days: ${moment(session_date).add(-1, 'days).format("DD")}
+clear_table:
td_ddl>:
empty_tables: ["aggregate_table"]
+looping:
loop>: ${days}
_do:
+ranking_of_the_day:
td>: /queries/ranking_of_the_day.sql
insert_into: aggregate_table
+create_table:
td_ddl>:
create_table: ["ranking_of_the_month"]
+ranking_of_the_month:
td>: queries/ranking_of_the_month.sql
insert_tab;e: ranking_of_the_monthデフォルトでは、エラー通知はワークフローオーナーにメールで送信されます。このシナリオを使用して、ワークフローエラー通知をカスタマイズできます。
不正なクエリを使用してワークフローを失敗させます。
_errorタスクでメール本文、タイトル、受信者を設定します。
ワークフローを実行し、エラー通知メールが受信されるかどうかを確認します。
timezone: UTC schedule:
_export:
td:
database: sample_datasets
engine: presto
_error:
mail>: body.txt
subject: "[ERROR] Workflow failed!"
to: [me@example.com]
month: ${moment(session_date).add(-1, 'days).format("YYY-MM")}
days: ${moment(session_date).add(-1, 'days).format("DD")}Hi {to},
The workflow session ${session_id} failed due to the following error:
${error.message}異なるクエリの結果を比較してデータを検証できます。
クエリ結果を比較するには、タスクを分割する必要があります。
store_last_resultsパラメータを使用してクエリを実行します。
保存されたクエリ結果にfor_each>:オペレーターを使用します。
ワークフローを実行し、エラー通知が受信されるかどうかを確認します。
store_last_resultsパラメータの場合、クエリ結果の最初の行を${td.results}変数に保存できます。ただし、後のtd>:オペレーターがstore_last_resultsパラメータを使用する場合、この変数は使用できません。
サンプルファイル
_export:
td:
database: db_name
+step1:
td>: queries/step1.sql
store_last_results: true
engine: presto
+group:
for_each>:
result1: ["${td.last_results.cnt}"]
_do:
+step2:
td>: queries/step2.sql
store_last_results: true
engine: presto
+step3:
if>: ${results1 == td.last.results.cnt}
_do:
echo>: Same count!
subject: "[ERROR] Workflow failed!"
to: [me@example.com]