Using TD Spark Driver on Amazon EMR

This article explains how to use the Apache Spark Driver for Treasure Data (td-spark) on Amazon Elastic MapReduce (EMR). Although we recommend using the us-east region of Amazon EC2 for the optimal performance, it can also be used in other Spark environments as well.

This feature is in BETA stage, and the access is disabled by default. We're looking for customers who know Apache Spark well and are willing to try this feature and give feedback to our team. If you're interested, please contact or our support team.

Table of Contents


What does the driver enable today?

  • Enable accessing Treasure Data from Spark in Scala and Python (PySpark).
  • Pull Treasure Data tables into Spark as a DataFrame (No TD query is issued, providing the shortest latency path between TD stored data & your spark cluster possible).
  • Issue Presto, Hive, or SparkSQL queries and return the result as a Spark DataFrame.

This driver is recommended for use with Spark 2.1.0 or higher

Recommendations regarding use

For fastest data access, and lowest data transfer costs, we recommend that you set-up your spark cluster in the AWS us-east region. Data Transfer costs may become quite high if using other AWS regions or processing environments.

This feature is currently in beta. While we anticipate giving this feature to all our customers by default, it may come at an additional expense once fully released if we see data access patterns that create additional expenses than anticipated.

TD Spark Driver on EMR

Create an EMR Spark cluster

  • Create an EMR cluster with Spark support. Using us-east region is highly recommended to maximize data transfer performance from S3.

  • Check the master node address of the new EMR EMR address

If you created EMR with default security group (ElasticMapReduce-master), please make sure to permit inbound access from your environment. Please refer “Amazon EMR-Managed Security Groups”.

Other references

Log-in to the EMR Cluster

Connect to EMR Master node with SSH

# Use 8157 for SOCKS5 proxy port so that you can access EMR Spark job history page (port 18080), Zeppelin note book (port 8890), etc.
$ ssh -i (your AWS key pair file. .pem) -D8157
Set Up TD Spark Integration

Download td-spark jar file:

[hadoop@ip-x-x-x-x]$ wget

Create a td.conf file in the master node:

# Describe your TD API key here TD API key)
# (recommended) this use KryoSerializer for faster performance

Using spark-shell on EMR

[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf
scala> import com.treasuredata.spark._
scala> val td =
scala> val d = td.df("sample_datasets.www_access")
|user|           host|                path|             referer|code|               agent|size|method|      time|
|null||    /category/health|   /category/cameras| 200|Mozilla/5.0 (Wind...|  77|   GET|1412373596|
|null||      /category/toys|   /item/office/4216| 200|Mozilla/5.0 (comp...| 115|   GET|1412373585|
|null||  /category/software|                   -| 200|Mozilla/5.0 (comp...| 116|   GET|1412373574|
only showing top 3 rows

Using Zeppelin Notebook on EMR

Configure Zeppelin for td-spark

Create SSH Tunnel to EMR Cluster:

$ ssh -i (your AWS key pair file. .pem) -D8157
  • (For Chrome users) Install Proxy Switchy Sharp Chrome Extension Proxy Config
    • Turn on proxy-switch for emr when accessing your EMR master
  • Open http://(your EMR master node public address):8890/
  • Configure td-spark at Interpreters page Zeppelin Config

Access Dataset in TD as DataFrame

  • Read table data as Spark DataFrame Dataframe

Running Presto Queries


Checking Spark History Server

  • Open http://(your EMR master node public address):18080/ history server

TD Spark driver use with PySpark & SparkSQL


$ ./bin/pyspark  --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf

>>> df ="com.treasuredata.spark").load("sample_datasets.www_access")
2016-07-19 16:34:15-0700  info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-19 16:34:16-0700  info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85)
|user|           host|                path|             referer|code|               agent|size|method|      time|
|null||/category/electro...|     /category/music| 200|Mozilla/4.0 (comp...|  66|   GET|1412333993|
|null||/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135|   GET|1412333977|
|null||/category/electro...|  /category/software| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333961|
|null||/item/electronics...|     /item/games/394| 200|Mozilla/5.0 (comp...|  83|   GET|1412333945|
|null||  /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...|  76|   GET|1412333930|
|null||/item/giftcards/4879|      /item/toys/197| 200|Mozilla/5.0 (Wind...| 137|   GET|1412333914|
|null||/item/computers/4785|                   -| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333898|
|null||     /category/games|     /category/games| 200|Mozilla/5.0 (Maci...|  41|   GET|1412333882|
|null||/item/giftcards/4410|                   -| 200|Mozilla/4.0 (comp...|  72|   GET|1412333866|
|null||/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...|  73|   GET|1412333850|
only showing top 10 rows

## Submitting presto job
>>> df ="com.treasuredata.spark").options(sql="select 1").load("sample_datasets")
2016-07-19 16:56:56-0700  info [TDSparkContext]
Submitted job 515990:
select 1 - (TDSparkContext.scala:70)
|    1|

## Reading job results
>>> df ="com.treasuredata.spark").load("job_id:515990")


# Register DataFrame as a temporary table
scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings")

scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]

2016-07-20 11:27:11-0700  info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-20 11:27:12-0700  info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85)
|            page_url|page_rank|
|xjhmjsuqolfklbvxn...|      251|
|seozvzwkcfgnfuzfd...|      165|
|fdgvmwbrjlmvuoquy...|      132|
|gqghyyardomubrfsv...|      108|
|qtqntqkvqioouwfuj...|      278|
|wrwgqnhxviqnaacnc...|      135|
|  cxdmunpixtrqnvglnt|      146|
| ixgiosdefdnhrzqomnf|      126|
|xybwfjcuhauxiopfi...|      112|
|ecfuzdmqkvqktydvi...|      237|
|dagtwwybivyiuxmkh...|      177|
|emucailxlqlqazqru...|      134|
|nzaxnvjaqxapdjnzb...|      119|
|       ffygkvsklpmup|      332|
|hnapejzsgqrzxdswz...|      171|
|rvbyrwhzgfqvzqkus...|      148|
|knwlhzmcyolhaccqr...|      104|
|nbizrgdziebsaecse...|      665|
|jakofwkgdcxmaaqph...|      187|
|kvhuvcjzcudugtidf...|      120|
only showing top 20 rows

