You can use the Apache Spark Driver for Treasure Data (also known as td-spark) on Amazon Elastic MapReduce (EMR). Although we recommend using the us-east region of Amazon EC2 for the optimal performance, you can use td-spark in other Spark environments as well.
Refer to TD Spark FAQs for an overview of td-spark.
- Access Treasure Data from Spark in Scala and Python (PySpark).
- Pull Treasure Data tables into Spark as a DataFrame (No TD query is issued, providing the shortest latency path between TD stored data and your spark cluster possible).
- Issue Presto, Hive, or SparkSQL queries and return the result as a Spark DataFrame.
This driver is recommended for use with Spark 2.4.4 or higher
For the fastest data access, and lowest data transfer costs, we recommend that you set-up your spark cluster in the AWS us-east region. Data Transfer costs may become quite high if using other AWS regions or processing environments.
Create an EMR cluster with Spark support.
The us-east region is recommended to maximize data transfer performance from S3.
Check the master node address of the new EMR.
Read table data as Spark DataFrame


If you created EMR with default security group (ElasticMapReduce-master), you need to permit inbound access from your environment. See Amazon EMR-Managed Security Groups.
Connect to EMR Master node with SSH
# Use 8157 for SOCKS5 proxy port so that you can access EMR Spark job history page (port 18080), Zeppelin note book (port 8890), etc.
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com
__| __|_ )
_| ( / Amazon Linux AMI
___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
4 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R
E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R
E::::E M::::::M:::M M:::M::::::M R:::R R::::R
E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R
E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR
E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R
E::::E M:::::M M:::M M:::::M R:::R R::::R
E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R
EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R
E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRRDownload td-spark jar file:
[hadoop@ip-x-x-x-x]$ wget https://s3.amazonaws.com/td-spark/td-spark-assembly_2.11-0.4.0.jarCreate a td.conf file in the master node:
# Describe your TD API key here
spark.td.apikey (your TD API key)
spark.td.site (your site name: us, jp, ap02, eu01, etc.)
# (recommended) this use KryoSerializer for faster performance
spark.serializer org.apache.spark.serializer.KryoSerializer[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
scala> import com.treasuredata.spark._
scala> val td = spark.td
scala> val d = td.table("sample_datasets.www_access").df
scala> d.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|136.162.131.221| /category/health| /category/cameras| 200|Mozilla/5.0 (Wind...| 77| GET|1412373596|
|null| 172.33.129.134| /category/toys| /item/office/4216| 200|Mozilla/5.0 (comp...| 115| GET|1412373585|
|null| 220.192.77.135| /category/software| -| 200|Mozilla/5.0 (comp...| 116| GET|1412373574|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 3 rows- Create SSH Tunnel to EMR Cluster.
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com(For Chrome users) Install Proxy Switchy Sharp Chrome ExtensionTurn on proxy-switch for EMR when accessing your EMR master 2. Open http://(your EMR master node public address):8890/ 3. Open the Interpreters page to configure td-spark.
4. Edit your profile details and select Save.

You can read table data as Spark DataFrame.


You can check your event history in the History Server using your EMR master node public address.
http://(your EMR master node public address):18080/

$ ./bin/pyspark --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf
>>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access")
>>> df.show(10)
2016-07-19 16:34:15-0700 info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-19 16:34:16-0700 info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 148.165.90.106|/category/electro...| /category/music| 200|Mozilla/4.0 (comp...| 66| GET|1412333993|
|null| 144.105.77.66|/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135| GET|1412333977|
|null| 108.54.178.116|/category/electro...| /category/software| 200|Mozilla/5.0 (Wind...| 69| GET|1412333961|
|null|104.129.105.202|/item/electronics...| /item/games/394| 200|Mozilla/5.0 (comp...| 83| GET|1412333945|
|null| 208.48.26.63| /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...| 76| GET|1412333930|
|null| 108.78.209.95|/item/giftcards/4879| /item/toys/197| 200|Mozilla/5.0 (Wind...| 137| GET|1412333914|
|null| 108.198.97.206|/item/computers/4785| -| 200|Mozilla/5.0 (Wind...| 69| GET|1412333898|
|null| 172.195.185.46| /category/games| /category/games| 200|Mozilla/5.0 (Maci...| 41| GET|1412333882|
|null| 88.24.72.177|/item/giftcards/4410| -| 200|Mozilla/4.0 (comp...| 72| GET|1412333866|
|null| 24.129.141.79|/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...| 73| GET|1412333850|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 10 rows
## Submitting presto job
>>> df = spark.read.format("com.treasuredata.spark").options(sql="select 1").load("sample_datasets")
2016-07-19 16:56:56-0700 info [TDSparkContext]
Submitted job 515990:
select 1 - (TDSparkContext.scala:70)
>>> df.show(10)
+-----+
|_col0|
+-----+
| 1|
+-----+
## Reading job results
>>> df = sqlContext.read.format("com.treasuredata.spark").load("job_id:515990")# Register DataFrame as a temporary table
scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings")
scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]
scala> q1.show
2016-07-20 11:27:11-0700 info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-20 11:27:12-0700 info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85)
+--------------------+---------+
| page_url|page_rank|
+--------------------+---------+
|xjhmjsuqolfklbvxn...| 251|
|seozvzwkcfgnfuzfd...| 165|
|fdgvmwbrjlmvuoquy...| 132|
|gqghyyardomubrfsv...| 108|
|qtqntqkvqioouwfuj...| 278|
|wrwgqnhxviqnaacnc...| 135|
| cxdmunpixtrqnvglnt| 146|
| ixgiosdefdnhrzqomnf| 126|
|xybwfjcuhauxiopfi...| 112|
|ecfuzdmqkvqktydvi...| 237|
|dagtwwybivyiuxmkh...| 177|
|emucailxlqlqazqru...| 134|
|nzaxnvjaqxapdjnzb...| 119|
| ffygkvsklpmup| 332|
|hnapejzsgqrzxdswz...| 171|
|rvbyrwhzgfqvzqkus...| 148|
|knwlhzmcyolhaccqr...| 104|
|nbizrgdziebsaecse...| 665|
|jakofwkgdcxmaaqph...| 187|
|kvhuvcjzcudugtidf...| 120|
+--------------------+---------+
only showing top 20 rows