Skip to content
Last updated

Using the Delta Sharing Capability in a Custom Script Environment

Here is an example of how to configure Delta Sharing capability with custom Python scripts:

  1. Create shared data on Databricks.

  2. After the shared data is configured, a Delta Sharing credential file can be downloaded.
    See Delta Sharing open sharing workflow for additional details.

  3. Create a workflow based on the processing needs.

  4. Execute a custom script (python code) from the workflow.

  5. Register Delta Sharing credentials to workflow secrets.

Use the JSON content of your Delta Sharing credentials to supply a Secret Key.

  1. Define a workflow task to call a custom Python script.

In the following example, note that

  • The Py> operator is used to call a specific method in your Python program.
  • The Delta Sharing credentials are passed from the workflow secret to the container using the ${secret:configShare} environment variable.
timezone: UTC
+call_exmple:
  docker:
    image: 'digdag/digdag-python:3.10'
  py>: delta_sharing_task.fetch_data
  _env:
    configShare: '${secret:configShare}'  

In the Python program, PySpark is used to access shared data on Databricks, and the data can be manipulated as a standard spark dataframe.

import sys  
import os  
import json  
from pyspark.sql import SparkSession  
  
  
def fetch_data():  
  # Install and import delta-sharing package  
  os.system(f'{sys.executable} -m pip install delta-sharing')  
  import delta_sharing  
    
    
  # Prepare config.share file  
  with open("config.share", "w") as outfile:  
    outfile.write(os.environ.get('configShare'))  
      
      
  # Fetch shared data on Databricks via Delta-Sharing server by PySpark  
  table_url = f"./config.share#your_share.your_database.your_table"  
  spark = SparkSession.builder \  
    .appName("delta-sharing-app") \  
    .master("local[*]") \  
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-core_2.12:2.2.0,io.delta:delta-sharing-spark_2.12:0.6.2') \  
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \  
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \  
    .getOrCreate()  
  shared_df = spark.read.format("deltaSharing").load(table_url).select("column1", "column2", "column3").show()

This requires that the Delta Sharing library has been installed and imported.