You can use the Apache Spark Driver for Treasure Data (also known as td-spark) on Amazon Elastic MapReduce (EMR). Although we recommend using the us-east region of Amazon EC2 for the optimal performance, you can use td-spark in other Spark environments as well.

Continue to the following topics:


Refer to TD Spark FAQs for an overview of td-spark.

What Does td-spark Enable?

  • Access Treasure Data from Spark in Scala and Python (PySpark).

  • Pull Treasure Data tables into Spark as a DataFrame (No TD query is issued, providing the shortest latency path between TD stored data and your spark cluster possible).

  • Issue Presto, Hive, or SparkSQL queries and return the result as a Spark DataFrame.

This driver is recommended for use with Spark 2.4.4 or higher

Recommendations Regarding Use

For the fastest data access, and lowest data transfer costs, we recommend that you set-up your spark cluster in the AWS us-east region. Data Transfer costs may become quite high if using other AWS regions or processing environments.

TD Spark Driver on EMR

Create an EMR Spark Cluster

  1. Create an EMR cluster with Spark support.

    The us-east region is recommended to maximize data transfer performance from S3.

  2. Check the master node address of the new EMR.

    Read table data as Spark DataFrame

If you created EMR with default security group (ElasticMapReduce-master), you need to permit inbound access from your environment. See Amazon EMR-Managed Security Groups.


Log-in to the EMR Cluster

Connect to EMR Master node with SSH

# Use 8157 for SOCKS5 proxy port so that you can access EMR Spark job history page (port 18080), Zeppelin note book (port 8890), etc.
$ ssh -i (your AWS key pair file. .pem) -D8157
     __|  __|_  )
       _|  (     /   Amazon Linux AMI
4 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.

E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R

Set Up TD Spark Integration

Download td-spark jar file:

[hadoop@ip-x-x-x-x]$ wget

Create a td.conf file in the master node:

# Describe your TD API key here (your TD API key) (your site name: us, jp, ap02, eu01, etc.)
# (recommended) this use KryoSerializer for faster performance
spark.serializer org.apache.spark.serializer.KryoSerializer

Using spark-shell on EMR

[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
scala> import com.treasuredata.spark._
scala> val td =
scala> val d = td.table("sample_datasets.www_access").df
|user|           host|                path|             referer|code|               agent|size|method|      time|
|null||    /category/health|   /category/cameras| 200|Mozilla/5.0 (Wind...|  77|   GET|1412373596|
|null||      /category/toys|   /item/office/4216| 200|Mozilla/5.0 (comp...| 115|   GET|1412373585|
|null||  /category/software|                   -| 200|Mozilla/5.0 (comp...| 116|   GET|1412373574|
only showing top 3 rows

Using Zeppelin Notebook on EMR

Configure Zeppelin for td-spark

  1. Create SSH Tunnel to EMR Cluster.

    $ ssh -i (your AWS key pair file. .pem) -D8157

    (For Chrome users) Install Proxy Switchy Sharp Chrome ExtensionTurn on proxy-switch for EMR when accessing your EMR master

  2. Open http://(your EMR master node public address):8890/

  3. Open the Interpreters page to configure td-spark.

  4. Edit your profile details and select Save.

Access Dataset in TD as DataFrame

You can read table data as Spark DataFrame.

Running Presto Queries

Checking Spark History Server

You can check your event history in the History Server using your EMR master node public address.

http://(your EMR master node public address):18080/

TD Spark Driver Use with PySpark and SparkSQL


$ ./bin/pyspark  --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf

>>> df ="com.treasuredata.spark").load("sample_datasets.www_access")
2016-07-19 16:34:15-0700  info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-19 16:34:16-0700  info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85)
|user|           host|                path|             referer|code|               agent|size|method|      time|
|null||/category/electro...|     /category/music| 200|Mozilla/4.0 (comp...|  66|   GET|1412333993|
|null||/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135|   GET|1412333977|
|null||/category/electro...|  /category/software| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333961|
|null||/item/electronics...|     /item/games/394| 200|Mozilla/5.0 (comp...|  83|   GET|1412333945|
|null||  /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...|  76|   GET|1412333930|
|null||/item/giftcards/4879|      /item/toys/197| 200|Mozilla/5.0 (Wind...| 137|   GET|1412333914|
|null||/item/computers/4785|                   -| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333898|
|null||     /category/games|     /category/games| 200|Mozilla/5.0 (Maci...|  41|   GET|1412333882|
|null||/item/giftcards/4410|                   -| 200|Mozilla/4.0 (comp...|  72|   GET|1412333866|
|null||/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...|  73|   GET|1412333850|
only showing top 10 rows

## Submitting presto job
>>> df ="com.treasuredata.spark").options(sql="select 1").load("sample_datasets")
2016-07-19 16:56:56-0700  info [TDSparkContext]
Submitted job 515990:
select 1 - (TDSparkContext.scala:70)
|    1|

## Reading job results
>>> df ="com.treasuredata.spark").load("job_id:515990")


# Register DataFrame as a temporary table
scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings")

scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]

2016-07-20 11:27:11-0700  info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-20 11:27:12-0700  info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85)
|            page_url|page_rank|
|xjhmjsuqolfklbvxn...|      251|
|seozvzwkcfgnfuzfd...|      165|
|fdgvmwbrjlmvuoquy...|      132|
|gqghyyardomubrfsv...|      108|
|qtqntqkvqioouwfuj...|      278|
|wrwgqnhxviqnaacnc...|      135|
|  cxdmunpixtrqnvglnt|      146|
| ixgiosdefdnhrzqomnf|      126|
|xybwfjcuhauxiopfi...|      112|
|ecfuzdmqkvqktydvi...|      237|
|dagtwwybivyiuxmkh...|      177|
|emucailxlqlqazqru...|      134|
|nzaxnvjaqxapdjnzb...|      119|
|       ffygkvsklpmup|      332|
|hnapejzsgqrzxdswz...|      171|
|rvbyrwhzgfqvzqkus...|      148|
|knwlhzmcyolhaccqr...|      104|
|nbizrgdziebsaecse...|      665|
|jakofwkgdcxmaaqph...|      187|
|kvhuvcjzcudugtidf...|      120|
only showing top 20 rows
  • No labels