# Amazon Elastic MapReduce

You can use the Apache Spark Driver for Treasure Data (also known as td-spark) on Amazon Elastic MapReduce (EMR). Although we recommend using the **us-east** region of Amazon EC2 for the optimal performance, you can use td-spark in other Spark environments as well.

## Overview

Refer to [TD Spark FAQs](/products/customer-data-platform/data-workbench/queries/trino/query_faqs) for an overview of td-spark.

## What Does td-spark Enable?

- Access Treasure Data from Spark in Scala and Python (PySpark).
- Pull Treasure Data tables into Spark as a DataFrame (No TD query is issued, providing the shortest latency path between TD stored data and your spark cluster possible).
- Issue Presto, Hive, or SparkSQL queries and return the result as a Spark DataFrame.


This driver is recommended for use with Spark 2.4.4 or higher

## Recommendations Regarding Use

For the fastest data access, and lowest data transfer costs, we recommend that you set-up your spark cluster in the AWS us-east region. Data Transfer costs may become quite high if using other AWS regions or processing environments.

## TD Spark Driver on EMR

### Create an EMR Spark Cluster

1. Create an EMR cluster with Spark support.
The **us-east** region is recommended to maximize data transfer performance from S3.
2. Check the master node address of the new EMR.
Read table data as Spark DataFrame


![](/assets/image2020-11-18_9-9-26.c6538e30038a6b2dd513a89175ca7d459cba989c4e8b5491b087998ef2f4e30e.d33fa2c4.png)

![](/assets/image2020-11-18_9-11-11.ce52ded660f570f751fb8d590bf08c5bc1575a6656533e0c4587e5a23ce1ec39.d33fa2c4.png)

If you created EMR with default security group (ElasticMapReduce-master), you need to permit inbound access from your environment. See [Amazon EMR-Managed Security Groups](http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-man-sec-groups.html).

### Reference

- [Create An EMR Cluster with Spark](http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-launch.html)


## Log-in to the EMR Cluster

[Connect to EMR Master node with SSH](http://docs.aws.amazon.com//ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node-ssh.html)


```bash
# Use 8157 for SOCKS5 proxy port so that you can access EMR Spark job history page (port 18080), Zeppelin note book (port 8890), etc.
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com
     __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
4 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
```

## Set Up TD Spark Integration

Download td-spark jar file:


```bash
[hadoop@ip-x-x-x-x]$ wget https://s3.amazonaws.com/td-spark/td-spark-assembly_2.11-0.4.0.jar
```

Create a **td.conf** file in the master node:


```bash
# Describe your TD API key here
spark.td.apikey (your TD API key)
spark.td.site (your site name: us, jp, ap02, eu01, etc.)
# (recommended) this use KryoSerializer for faster performance
spark.serializer org.apache.spark.serializer.KryoSerializer
```

## Using spark-shell on EMR


```bash
[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
scala> import com.treasuredata.spark._
scala> val td = spark.td
scala> val d = td.table("sample_datasets.www_access").df
scala> d.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|136.162.131.221|    /category/health|   /category/cameras| 200|Mozilla/5.0 (Wind...|  77|   GET|1412373596|
|null| 172.33.129.134|      /category/toys|   /item/office/4216| 200|Mozilla/5.0 (comp...| 115|   GET|1412373585|
|null| 220.192.77.135|  /category/software|                   -| 200|Mozilla/5.0 (comp...| 116|   GET|1412373574|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 3 rows
```

## Using Zeppelin Notebook on EMR

### Configure Zeppelin for td-spark

1. Create SSH Tunnel to EMR Cluster.



```bash
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com
```

(For Chrome users) Install Proxy Switchy Sharp Chrome ExtensionTurn on proxy-switch for EMR when accessing your EMR master
2. Open `http://(your EMR master node public address):8890/`
3. Open the **Interpreters** page to configure td-spark.

![](/assets/image2020-11-18_9-15-28.a4f96f2069e1df24d7c95be0d35e0d01c8821747176e796e484e3c15a686c98f.d33fa2c4.png)
4. Edit your profile details and select **Save**.

![](/assets/image2020-11-18_9-16-54.cb15bd5327e74c96e696f3d16f94c0eca3c92fc777e25e082d78cf668caca70b.d33fa2c4.png)

### Access Dataset in TD as DataFrame

You can read table data as Spark DataFrame.

![](/assets/image2020-11-18_9-17-56.82c18313f4714d4d7a00e721ac7cd60cd21bf1ec4ab882db2e1be59b85254977.d33fa2c4.png)

### Running Presto Queries

![](/assets/image2020-11-18_9-18-48.0e69e43eeae788cd1b81f341a8ac3874bfcd86cb55306d80b9166ad729cf9b5a.d33fa2c4.png)

### Checking Spark History Server

You can check your event history in the **History Server** using your EMR master node public address.

`http://(your EMR master node public address):18080/`

![](/assets/image2020-11-18_9-21-52.42a636aa3768897c576b4224cfb24be36c330e2135b7c1f7a7b9c5558e55b395.d33fa2c4.png)

## TD Spark Driver Use with PySpark and SparkSQL

### PySpark


```bash
$ ./bin/pyspark  --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf

>>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access")
>>> df.show(10)
2016-07-19 16:34:15-0700  info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-19 16:34:16-0700  info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 148.165.90.106|/category/electro...|     /category/music| 200|Mozilla/4.0 (comp...|  66|   GET|1412333993|
|null|  144.105.77.66|/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135|   GET|1412333977|
|null| 108.54.178.116|/category/electro...|  /category/software| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333961|
|null|104.129.105.202|/item/electronics...|     /item/games/394| 200|Mozilla/5.0 (comp...|  83|   GET|1412333945|
|null|   208.48.26.63|  /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...|  76|   GET|1412333930|
|null|  108.78.209.95|/item/giftcards/4879|      /item/toys/197| 200|Mozilla/5.0 (Wind...| 137|   GET|1412333914|
|null| 108.198.97.206|/item/computers/4785|                   -| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333898|
|null| 172.195.185.46|     /category/games|     /category/games| 200|Mozilla/5.0 (Maci...|  41|   GET|1412333882|
|null|   88.24.72.177|/item/giftcards/4410|                   -| 200|Mozilla/4.0 (comp...|  72|   GET|1412333866|
|null|  24.129.141.79|/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...|  73|   GET|1412333850|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 10 rows

## Submitting presto job
>>> df = spark.read.format("com.treasuredata.spark").options(sql="select 1").load("sample_datasets")
2016-07-19 16:56:56-0700  info [TDSparkContext]
Submitted job 515990:
select 1 - (TDSparkContext.scala:70)
>>> df.show(10)
+-----+
|_col0|
+-----+
|    1|
+-----+

## Reading job results
>>> df = sqlContext.read.format("com.treasuredata.spark").load("job_id:515990")
```

### SparkSQL


```scala
# Register DataFrame as a temporary table
scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings")

scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]

scala> q1.show
2016-07-20 11:27:11-0700  info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-20 11:27:12-0700  info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85)
+--------------------+---------+
|            page_url|page_rank|
+--------------------+---------+
|xjhmjsuqolfklbvxn...|      251|
|seozvzwkcfgnfuzfd...|      165|
|fdgvmwbrjlmvuoquy...|      132|
|gqghyyardomubrfsv...|      108|
|qtqntqkvqioouwfuj...|      278|
|wrwgqnhxviqnaacnc...|      135|
|  cxdmunpixtrqnvglnt|      146|
| ixgiosdefdnhrzqomnf|      126|
|xybwfjcuhauxiopfi...|      112|
|ecfuzdmqkvqktydvi...|      237|
|dagtwwybivyiuxmkh...|      177|
|emucailxlqlqazqru...|      134|
|nzaxnvjaqxapdjnzb...|      119|
|       ffygkvsklpmup|      332|
|hnapejzsgqrzxdswz...|      171|
|rvbyrwhzgfqvzqkus...|      148|
|knwlhzmcyolhaccqr...|      104|
|nbizrgdziebsaecse...|      665|
|jakofwkgdcxmaaqph...|      187|
|kvhuvcjzcudugtidf...|      120|
+--------------------+---------+
only showing top 20 rows
```