You can use td-pyspark to bridge the results of data manipulations in Amazon EMR (Elastic MapReduce) with your data in Treasure Data.
Amazon EMR is an AWS tool for big data processing and analysis, providing an easy-to-use interface for accessing Spark. PySpark is a Python API for Spark. Treasure Data's td-pyspark is a Python library that provides a handy way to use PySpark and Treasure Data based on td-spark.
To follow the steps in this example, you must have the following Treasure Data items:
Treasure Data API key
td-spark feature enabled
You create a key pair, cluster, install td-pyspark libraries and configure a notebook for your connection code.
Log into Amazon Management Service. Under Find Services, enter EMR. Amazon EMR cluster nodes run on Amazon EC2 instances.
When you create the key pair in Amazon, you provide a name, and a file with the extension of .pem is generated. You download the generated file to your local computer.
For more information about creating an Amazon EC2 key pair, see Amazon EC2 Key Pairs.
You refer to the key when you create a cluster, You specify the Amazon EC2 key pair that is used for SSH connections to all cluster instances.
Complete the configuration fields. Provide a cluster name, a folder location for the cluster data, and select version Spark 2.4.3 or later as the Application.
Specify the instance and key pair.
To grant inbound access permission from your local computer, you specify a secure protocol.
Select the security group for Master:
In the Amazon, find the Master node that you want to access:
With the proxy established, you log onto the Master node instance. On your local computer, access the AWS Master node instance through an ssh protocol.
$ ssh -i ~/ <your_aws_keypair_kry.pem> hadoop@ <Master public DNS>
You see a connection confirmation on your local computer.
Access the Treasure Data Apache Spark Driver Release Notes for additional information and the most current download or select the link below.
Inside your EMR instance, select to download the library to your Master node.
Still, within the Master node instance, run the following command to install pyspark:
$ pip install td_pyspark
In the Master node instance, create a td-spark.conf file. In the configuration file, specify your TD API Key, TD site parameters, and Spark environment.
An example of the format is as follows. You provide the actual values:
spark.td.apikey (Your TD API KEY) spark.td.site (Your site: us, jp, eu01, ap02) spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.execution.arrow.enabled true
Launch PySpark. In your command, include the arguments as shown in the following example:
% pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf
You see something similar to the following:
Then load td_pyspark, as follows. Note the prompt symbol changes to >>>:
>>> import td_pyspark >>> from pyspark import SparkContext >>> from pyspark.sql import SparkSession >>> builder = SparkSession.builder.appName("td-pyspark-test") >>> td = td_pyspark.TDSparkContextBuilder(builder).build() >>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df() >>> df.show()
TDSparkContextBuilder is an entry point to access td_pyspark's functionalities. As shown in the preceding code sample, you read tables in Treasure Data as data frames:
df = td.table("tablename").df()
You see a result similar to the following:
Your connection is working.
You can run select and insert queries to Treasure Data or query back data from Treasure Data. You can also create and delete databases and tables.
You can use the following commands:
To read a table, use
df = td.table("sample_datasets.www_access").df() df.show()
To change the context database, use
td.use("sample_datasets") # Accesses sample_datasets.www_access df = td.table("www_access").df()
.df() your table data is read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.
df = td.table("www_access").df()
If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can use Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark.
q = td.presto("select code, * from sample_datasets.www_access") q.show() q = td.presto("select code, count(*) from sample_datasets.www_access group by 1") q.show()
To save your local DataFrames as a table, you have two options:
Insert the records in the input DataFrame to the target table
Create or replace the target table with the content of the input DataFrame
td.insert_into(df, "mydb.tbl1") td.create_or_replace(df, "mydb.tbl2")
You can use TD toolbelt to check your database from a command line. Alternatively, if you have TD Console, you can check your databases and queries. Read about Database and Table Management.