# TD Python Spark Driver With Amazon EMR Treasure Data will no longer accept new users for the Plazma Public API that is used for Spark Driver.Use [pytd library](https://api-docs.treasuredata.com/en/tools/pytd/quickstart/) instead. You can use td-pyspark to bridge the results of data manipulations in Amazon EMR (Elastic MapReduce) with your data in Treasure Data. Amazon EMR is an AWS tool for big data processing and analysis, providing an easy-to-use interface for accessing Spark. PySpark is a Python API for Spark. Treasure Data's [td-pyspark](https://pypi.org/project/td-pyspark/) is a Python library that provides a handy way to use PySpark and Treasure Data based on [td-spark](https://docs.treasuredata.com/display/PD/Treasure+Data+Apache+Spark+Driver+Release+Notes). * [TD Python Spark Driver with Amazon EMR (Deprecated)](#td-python-spark-driver-with-amazon-emr) * [Prerequisites](#prerequisites) * [Configuring your Amazon EMR Environment](#configuring-your-amazon-emr-environment) * [Create an EC2 Key Pair](#create-an-ec2-key-pair) * [Create a Cluster on Amazon EMR](#create-a-cluster-on-amazon-emr) * [Connect to EMR Master Node with SSH](#connect-to-emr-master-node-with-ssh) * [Install the td-pyspark Libraries](#install-the-td-pyspark-libraries) * [Create a Configuration File and Specify your TD API Key and Site](#create-a-configuration-file-and-specify-your-td-api-key-and-site) * [Launch PySpark](#launch-pyspark) * [Interacting with Treasure Data in your Master Node Instance](#interacting-with-treasure-data-in-your-master-node-instance) * [Read Tables as DataFrames](#read-tables-as-dataframes) * [Change the Database Used in Treasure Data](#change-the-database-used-in-treasure-data) * [Access sample_datasets.www_access](#access-sample_datasets.www_access) * [Submit Presto Queries](#submit-presto-queries) * [You see:](#you-see) * [Create or Drop a Database](#create-or-drop-a-database) * [Upload DataFrames to Treasure Data](#upload-dataframes-to-treasure-data) ## Prerequisites To follow the steps in this example, you must have the following Treasure Data items: * Treasure Data API key * td-spark feature enabled ## Configuring your Amazon EMR Environment You create a key pair, cluster, install td-pyspark libraries and configure a notebook for your connection code. Log into Amazon Management Service. Under Find Services, enter EMR. Amazon EMR cluster nodes run on Amazon EC2 instances. ### Create an EC2 Key Pair When you create the key pair in Amazon, you provide a name, and a file with the extension of .pem is generated. You download the generated file to your local computer. For more information about creating an Amazon EC2 key pair, see [Amazon EC2 Key Pairs](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.md). You refer to the key when you create a cluster, You specify the Amazon EC2 key pair that is used for SSH connections to all cluster instances. ## Create a Cluster on Amazon EMR Complete the configuration fields. Provide a cluster name, a folder location for the cluster data, and select version Spark 2.4.3 or later as the **Application**. Specify the instance and key pair. ## Connect to EMR Master Node with SSH To grant inbound access permission from your local computer, you specify a secure protocol. Select the security group for Master: In the Amazon, find the Master node that you want to access: With the proxy established, you log onto the Master node instance. On your local computer, access the AWS Master node instance through an ssh protocol. `$ ssh -i ~/ hadoop@ ` You see a connection confirmation on your local computer. ## Install the td-pyspark Libraries Access the [Treasure Data Apache Spark Driver Release Notes](https://docs.treasuredata.com/display/PD/Treasure+Data+Apache+Spark+Driver+Release+Notes) for additional information and the most current download or select the link below. Inside your EMR instance, select to download the library to your Master node. * [td-spark-assembly-20.4.0_spark2.4.5.jar](https://td-spark.s3.amazonaws.com/td-spark-assembly-20.4.0_spark2.4.5.jar) Still, within the Master node instance, run the following command to install pyspark: `$ pip install td_pyspark` ## Create a Configuration File and Specify your TD API Key and Site In the Master node instance, create a td-spark.conf file. In the configuration file, specify your TD API Key, TD site parameters, and Spark environment. An example of the format is as follows. You provide the actual values: spark.td.apikey (Your TD API KEY) spark.td.site (Your site: us, jp, eu01, ap02) spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.execution.arrow.enabled true ## Launch PySpark Launch PySpark. In your command, include the arguments as shown in the following example: % pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf You see something similar to the following: Then load td_pyspark, as follows. Note the prompt symbol changes to >>>: ```python >>> import td_pyspark >>> from pyspark import SparkContext >>> from pyspark.sql import SparkSession >>> builder = SparkSession.builder.appName("td-pyspark-test") >>> td = td_pyspark.TDSparkContextBuilder(builder).build() >>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df() >>> df.show() ``` *TDSparkContextBuilder* is an entry point to access td_pyspark's functionalities. As shown in the preceding code sample, you read tables in Treasure Data as data frames: `df = td.table("tablename").df()` You see a result similar to the following: Your connection is working. # Interacting with Treasure Data in your Master Node Instance You can run select and insert queries to Treasure Data or query back data from Treasure Data. You can also create and delete databases and tables. You can use the following commands: ## Read Tables as DataFrames To read a table, use td.table (table_name): df = td.table("sample_datasets.www_access").df() df.show() ## **Change the Database Used in Treasure Data** To change the context database, use td.use (database_name): td.use("sample_datasets") ## Accesses sample_datasets.www_access df = td.table("www_access").df() By calling .df() your table data is read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.md#pyspark.sql.DataFrame). ### **Access sample_datasets.www_access** `df = td.table("www_access").df()` ### **Submit Presto Queries** If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can use Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark. ```python q = td.presto("select code, * from sample_datasets.www_access") q.show() q = td.presto("select code, count(*) from sample_datasets.www_access group by 1") q.show() ``` #### You see: ## **Create or Drop a Database** ```python td.create_database_if_not_exists("") td.drop_database_if_exists("") ``` ## **Upload DataFrames to Treasure Data** To save your local DataFrames as a table, you have two options: * Insert the records in the input DataFrame to the target table * Create or replace the target table with the content of the input DataFrame ```python td.insert_into(df, "mydb.tbl1") td.create_or_replace(df, "mydb.tbl2") ``` You can use TD toolbelt to check your database from a command line. Alternatively, if you have TD Console, you can check your databases and queries. Read about [Database and Table Management](http://docs.treasuredata.com/display/PD/Database+and+Table+Management).