# カスタムスクリプト環境でDelta Sharing機能を使用する 以下は、カスタムPythonスクリプトでDelta Sharing機能を設定する方法の例です: 1. Databricks上に共有データを作成します。 2. 共有データの設定が完了したら、Delta Sharing認証情報ファイルをダウンロードできます。 詳細については、[Delta Sharingオープン共有ワークフロー](https://docs.databricks.com/en/delta-sharing/share-data-open.md#delta-sharing-open-sharing-workflow)を参照してください。 3. 処理ニーズに基づいてワークフローを作成します。 4. ワークフローからカスタムスクリプト(pythonコード)を実行します。 5. Delta Sharing認証情報をワークフローシークレットに登録します。 Delta Sharing認証情報のJSONコンテンツを使用してシークレットキーを提供します。 ![](https://lh7-rt.googleusercontent.com/docsz/AD_4nXfW_dJ8vUi17U05fth0OxpTNu-Ri9UTbsjRvtWe_V5ePM3tHlrv0q9CGCmiW1HFro7A3lbj8JJiM8KopapfdPbiksantTOgVtWdIcBf0VDbCiSJ_K5G1hVswBvsJHIuszEwZ13d8uueVKUGRfhJyWDHA9zIv482r5kYHyp6gw?key=odNQrvH9tO5QyI9DeUwa6g) 1. カスタムPythonスクリプトを呼び出すワークフロータスクを定義します。 以下の例では、次の点に注意してください。 * Py>オペレーターは、Pythonプログラム内の特定のメソッドを呼び出すために使用されます。 * Delta Sharing認証情報は、${secret:configShare}環境変数を使用してワークフローシークレットからコンテナに渡されます。 ```yaml timezone: UTC +call_exmple: docker: image: 'digdag/digdag-python:3.10' py>: delta_sharing_task.fetch_data _env: configShare: '${secret:configShare}' ``` Pythonプログラムでは、PySparkを使用してDatabricks上の共有データにアクセスし、データを標準的なspark dataframeとして操作できます。 ```python import sys import os import json from pyspark.sql import SparkSession def fetch_data(): # Install and import delta-sharing package os.system(f'{sys.executable} -m pip install delta-sharing') import delta_sharing # Prepare config.share file with open("config.share", "w") as outfile: outfile.write(os.environ.get('configShare')) # Fetch shared data on Databricks via Delta-Sharing server by PySpark table_url = f"./config.share#your_share.your_database.your_table" spark = SparkSession.builder \ .appName("delta-sharing-app") \ .master("local[*]") \ .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-core_2.12:2.2.0,io.delta:delta-sharing-spark_2.12:0.6.2') \ .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \ .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \ .getOrCreate() shared_df = spark.read.format("deltaSharing").load(table_url).select("column1", "column2", "column3").show() ``` これには、Delta Sharingライブラリがインストールされ、インポートされている必要があります。