You can use td-pyspark to bridge the results of data manipulations in Amazon EMR (Elastic MapReduce) with your data in Treasure Data.

Amazon EMR is an AWS tool for big data processing and analysis, providing an easy-to-use interface for accessing Spark. PySpark is a Python API for Spark. Treasure Data's td-pyspark is a Python library that provides a handy way to use PySpark and Treasure Data based on td-spark.


To follow the steps in this example, you must have the following Treasure Data items:

  • Treasure Data API key

  • td-spark feature enabled

Configuring your Amazon EMR Environment

You create a key pair, cluster, install td-pyspark libraries and configure a notebook for your connection code.

Log into Amazon Management Service. Under Find Services, enter EMR. Amazon EMR cluster nodes run on Amazon EC2 instances.

Create an EC2 Key Pair

When you create the key pair in Amazon, you provide a name, and a file with the extension of .pem is generated. You download the generated file to your local computer.

For more information about creating an Amazon EC2 key pair, see Amazon EC2 Key Pairs.

You refer to the key when you create a cluster, You specify the Amazon EC2 key pair that is used for SSH connections to all cluster instances.

Create a Cluster on Amazon EMR

Complete the configuration fields. Provide a cluster name, a folder location for the cluster data, and select version Spark 2.4.3 or later as the Application.

Specify the instance and key pair.

Connect to EMR Master Node with SSH

To grant inbound access permission from your local computer, you specify a secure protocol.

Select the security group for Master:

In the Amazon, find the Master node that you want to access:

With the proxy established, you log onto the Master node instance. On your local computer, access the AWS Master node instance through an ssh protocol.

$ ssh -i ~/ <your_aws_keypair_kry.pem> hadoop@ <Master public DNS>

You see a connection confirmation on your local computer.

Install the td-pyspark Libraries

Access the Treasure Data Apache Spark Driver Release Notes for additional information and the most current download or select the link below.

Inside your EMR instance, select to download the library to your Master node.

Still, within the Master node instance, run the following command to install pyspark:

$  pip install td_pyspark

Create a Configuration File and Specify your TD API Key and Site

In the Master node instance, create a td-spark.conf file. In the configuration file, specify your TD API Key, TD site parameters, and Spark environment.

An example of the format is as follows. You provide the actual values: (Your TD API KEY) (Your site: us, jp, eu01, ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled true

Launch PySpark

Launch PySpark. In your command, include the arguments as shown in the following example:

% pyspark  --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf

You see something similar to the following:

Then load td_pyspark, as follows. Note the prompt symbol changes to >>>:

>>> import td_pyspark
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("td-pyspark-test")
>>> td = td_pyspark.TDSparkContextBuilder(builder).build()
>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()

TDSparkContextBuilder is an entry point to access td_pyspark's functionalities. As shown in the preceding code sample, you read tables in Treasure Data as data frames:

df = td.table("tablename").df()

You see a result similar to the following:

Your connection is working.

Interacting with Treasure Data in your Master Node Instance

You can run select and insert queries to Treasure Data or query back data from Treasure Data. You can also create and delete databases and tables.

You can use the following commands:

Read Tables as DataFrames

To read a table, use td.table (table_name):

df = td.table("sample_datasets.www_access").df()

Change the Database Used in Treasure Data

To change the context database, use td.use (database_name):

# Accesses sample_datasets.www_access
df = td.table("www_access").df()

By calling .df() your table data is read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.

Access sample_datasets.www_access

  df = td.table("www_access").df()

Submit Presto Queries

If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can use Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark.

q = td.presto("select code, * from sample_datasets.www_access")

q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")

You see:

Create or Drop a Database


Upload DataFrames to Treasure Data

To save your local DataFrames as a table, you have two options:

  • Insert the records in the input DataFrame to the target table

  • Create or replace the target table with the content of the input DataFrame

td.insert_into(df, "mydb.tbl1")

td.create_or_replace(df, "mydb.tbl2")

Checking Amazon EMR in Treasure Data

You can use TD toolbelt to check your database from a command line. Alternatively, if you have TD Console, you can check your databases and queries. Read about Database and Table Management.

  • No labels