You can use td-pyspark to bridge the results of data manipulations in Databrick with your data in Treasure Data.

Databricks builds on top of Apache Spark providing an easy to use interface for accessing Spark. PySpark is a Python API for Spark. Treasure Data's td-pyspark is a Python library that provides a handy way to use PySpark and Treasure Data based on td-spark.


To follow the steps in this example, you must have the following times:

  • Treasure Data API key

  • td-spark feature enabled

Configuring your Databricks Environment

You create a cluster, install td-pyspark libraries and configure a notebook for your connection code.

Create a Cluster on Databricks

  • Select the Cluster icon.

  • Select Create Cluster.

  • Provide a cluster name, select version Spark 2.4.3 or later as the Databricks Runtime Version and select 3 as the Python Version.

Install the td-pyspark Libraries

Access the Treasure Data Apache Spark Driver Release Notes for additional information and the most current download or select the link below.

Select to download

Select PyPI.

When the download completes, you see the following:

Specify your TD API Key and Site

In the Spark configuration, you specify the Treasure Data API key and enter the environment variables.

An example of the format is as follows. You provide the actual values: (Your TD API KEY) (Your site: us, jp, eu01, ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled true

Restart Cluster and Begin Work in Databricks

Restart your Spark cluster. Create a notebook. Create a script similar to the following code:

from pyspark.sql import *
import td_pyspark

SAMPLE_TABLE = "sample_datasets.www_access"
td = td_pyspark.TDSparkContext(spark)

df = td.table(SAMPLE_TABLE).within("-10y").df()

TDSparkContext is an entry point to access td_pyspark's functionalities. As shown in the preceding code sample, to create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:

td = TDSparkContext(spark)

You see a result similar to the following:

Your connection is working.

Interacting with Treasure Data from Databricks

In Databricks, you can run select and insert queries to Treasure Data or query back data from Treasure Data. You can also create and delete databases and tables.

In Databricks you can use the following commands:

Read Tables as DataFrames

To read a table, use td.table (table_name):

df = td.table("sample_datasets.www_access").df()

Change the Database Used in Treasure Data

To change the context database, use td.use (database_name):

# Accesses sample_datasets.www_access
df = td.table("www_access").df()

By calling .df() your table data is read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.

Access sample_datasets.www_access

  df = td.table("www_access").df()

Submit Presto Queries

If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can use Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark.

q = td.presto("select code, * from sample_datasets.www_access") = td.presto("select code, count(*) from sample_datasets.www_access group by 1")

You see:

Create or Drop a Database


Upload DataFrames to Treasure Data

To save your local DataFrames as a table, you have two options:

  • Insert the records in the input DataFrame to the target table

  • Create or replace the target table with the content of the input DataFrame

td.insert_into(df, "mydb.tbl1")td.create_or_replace(df, "mydb.tbl2")

Checking Databricks in Treasure Data

You can use td toolbelt to check your database from a command line. Alternatively, if you have TD Console, you can check your databases and queries. Read about Database and Table Management.

  • No labels