Skip to content
Last updated

カスタムスクリプト環境でDelta Sharing機能を使用する

以下は、カスタムPythonスクリプトでDelta Sharing機能を設定する方法の例です:

  1. Databricks上に共有データを作成します。

  2. 共有データの設定が完了したら、Delta Sharing認証情報ファイルをダウンロードできます。 詳細については、Delta Sharingオープン共有ワークフローを参照してください。

  3. 処理ニーズに基づいてワークフローを作成します。

  4. ワークフローからカスタムスクリプト(pythonコード)を実行します。

  5. Delta Sharing認証情報をワークフローシークレットに登録します。

Delta Sharing認証情報のJSONコンテンツを使用してシークレットキーを提供します。

  1. カスタムPythonスクリプトを呼び出すワークフロータスクを定義します。

以下の例では、次の点に注意してください。

  • Py>オペレーターは、Pythonプログラム内の特定のメソッドを呼び出すために使用されます。
  • Delta Sharing認証情報は、${secret:configShare}環境変数を使用してワークフローシークレットからコンテナに渡されます。
timezone: UTC
+call_exmple:
  docker:
    image: 'digdag/digdag-python:3.10'
  py>: delta_sharing_task.fetch_data
  _env:
    configShare: '${secret:configShare}'

Pythonプログラムでは、PySparkを使用してDatabricks上の共有データにアクセスし、データを標準的なspark dataframeとして操作できます。

import sys
import os
import json
from pyspark.sql import SparkSession


def fetch_data():
  # Install and import delta-sharing package
  os.system(f'{sys.executable} -m pip install delta-sharing')
  import delta_sharing


  # Prepare config.share file
  with open("config.share", "w") as outfile:
    outfile.write(os.environ.get('configShare'))


  # Fetch shared data on Databricks via Delta-Sharing server by PySpark
  table_url = f"./config.share#your_share.your_database.your_table"
  spark = SparkSession.builder \
    .appName("delta-sharing-app") \
    .master("local[*]") \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-core_2.12:2.2.0,io.delta:delta-sharing-spark_2.12:0.6.2') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .getOrCreate()
  shared_df = spark.read.format("deltaSharing").load(table_url).select("column1", "column2", "column3").show()

これには、Delta Sharingライブラリがインストールされ、インポートされている必要があります。