Visit our new documentation site! This documentation page is no longer updated.

TD Spark Driver (td-spark) FAQs

Table of Contents

Q: Where can I download td-spark jars?

A standalone jar file (td-spark-assembly.jar) that can be used with existing Spark releases is available from here.

Q: How can I start using td-spark?

td-spark driver is in a BETA stage and an access from the driver is restricted. Please contact product@treasure-data.com to enable td-spark driver.

Q: What td-spark can do?

  • Accessing Treasure Data from Spark in Java, Scala, and Python (PySpark).
  • Reading Treasure Data tables as Spark DataFrames.
  • Issuing Presto/Hive queries and reading the results as Spark DataFrames.
  • Issuing queries with Spark SQL.

Q: What td-spark can’t do?

  • Launching a Spark cluster.

    • td-spark is not a hosted service. To use Spark you need to create your own Spark cluster, which can be hosted on premise, or in a cloud service such as Amazon EMR. To learn how to craete your own Spark cluster on Amazon EMR, see Apache Spark on EMR.
  • Saving DataFrames to Treasure Data.

    • Currently td-spark doesn’t support direct upload of DataFarmes to Treasure Data. To save DataFrame to Trasure Data, you have two options:
      • If you can save DataFrames on Amazon S3, Data Connector for Amazon S3 can be used to upload the data from S3 to Treasure Data.
      • You can also use Embulk + embulk-output-td plugin to upload data stored on local disks to Treasure Data.

Q: Which version of Spark is supported?

td-spark driver works with Spark 2.1.0 or higher. It also works with Spark 2.0, but doesn’t support older versions of Spark (e.g., Spark 1.6).

Q: How does td-spark read the data?

For a given database and table names, td-spark receives special URLs for accessing your data set, which expires after 1 hour. These URLs will be used for sending the data from Amazon S3, so If you are using Spark outside Amazon EC2 us-east regions (e.g., your local machine or other cloud services, etc.), reading the table data will be significantly slow because out-bound transfer from S3 is inefficient. To maximize the performance we highly recommend to launch Spark clusters inside us-east region of Amazon EC2.

Here are some other tips to reduce the data transfer size:

  • Always apply time filtering conditions.
    • For example, td.df("(database).(table)", from="2017-01-01", to="2017-01-02") will read only 1-day range data. The data outside this time range will not be accessed.
  • Read only necessary columns.

For example, you can use select function to restrict the columns to be accessed:

scala> val df = td.df("sample_datasets.www_access", from="2014-10-03 09:12:00 UTC", to="2014-10-03 09:13:00 UTC")
2017-09-05 13:55:41-0700  info [LifeCycleManager] [session:7524125c] Life cycle is starting ... - (LifeCycleManager.scala:118)
2017-09-05 13:55:41-0700  info [LifeCycleManager] [session:7524125c] ======= STARTED ======= - (LifeCycleManager.scala:122)
df: org.apache.spark.sql.DataFrame = [user: bigint, host: string ... 7 more fields]

scala> val result = df.select("time", "host", "path", "code")
result: org.apache.spark.sql.DataFrame = [time: bigint, host: string ... 2 more fields]

scala> result.show
2017-09-05 13:56:01-0700  info [TDRelation] Fetching www_access within time range:[1412327520,1412327580) - (TDRelation.scala:113)
2017-09-05 13:56:02-0700  info [TDRelation] Retrieved 1 PlazmaAPI entries - (TDRelation.scala:116)
+----------+---------------+--------------------+----+
|      time|           host|                path|code|
+----------+---------------+--------------------+----+
|1412327571| 152.150.61.167|  /item/cameras/2043| 200|
|1412327554| 176.216.78.209|/item/electronics...| 200|
|1412327538|168.180.177.180|/category/electro...| 200|
|1412327522| 124.189.68.209|  /category/software| 200|
+----------+---------------+--------------------+----+

Similarly to Presto, accessing only necessary columns is always a good practice for using a columnar storage like Treasure Data.

Q: How can I configure td-spark?

Create td-spark.conf file and add your own properties. At least you need to set spark.td.apikey property for accessing Treasure Data:

td-spark.conf

# Set your TD API key here
spark.td.apikey=(your TD API key)
# Using KryoSerializer is recommended for faster performance
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.driver.memory=2g   # default=1g
spark.executor.memory=2g # default=1g

See also the list of the avaiable properties in Spark.

To read these configurations, add --peroperties-file (path to your td.conf file) option when launching Spark:

./bin/spark-shell --jars td-spark-assembly-0.3.jar  --properties-file td-spark.conf

Q: How can I start Spark shell?

Spark shell is a convenient interface to quickly start using Spark with td-spark driver:

  1. Download td-spark-assembly.jar file.
  2. Create td-spark.conf file
  3. Download a Spark distribution (Choose Spark 2.1.0 or higher, Pre-built for Apache Hadoop 2.7 and later) and extract the archive.

Then enter the extracted folder, and you can start spark-shell command as follows:

spark-2.1.0-bin-hadoop2.7$ ./bin/spark-shell --jars td-spark-assembly-0.3.jar  --properties-file td-spark.conf
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/05 12:55:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/05 12:55:40 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.10.0.5:4040
Spark context available as 'sc' (master = local[*], app id = local-1504641337677).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Q: How can I start PySpark shell?

For using PySpark (Spark for Python), you need to follow the same steps in Spark Shell installation, and run pyspark command as follows:

spark-2.1.0-bin-hadoop2.7$ ./bin/pyspark --driver-class-path td-spark-assembly-0.3.jar --properties-file td-spark.conf
Python 2.7.10 (default, Feb  7 2017, 00:08:15)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/05 13:36:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/05 13:36:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.10 (default, Feb  7 2017 00:08:15)
SparkSession available as 'spark'.
>>>

Q: How can I read tables?

Solution: Use com.treasuredata.spark.td.df command

First import com.treasuredata.spark._. Then use spark.td.df("(database).(table)", from="yyyy-MM-dd HH:mm:ss zzz", to="yyyy-MM-dd HH:mm:ss zzz") command:

Scala

scala> import com.treasuredata.spark._; val td = spark.td;
import com.treasuredata.spark._

scala> val df = td.df("sample_datasets.www_access", from="2014-10-03 09:12:00 UTC", to="2014-10-03 09:13:00 UTC")
df: org.apache.spark.sql.DataFrame = [user: bigint, host: string ... 7 more fields]

scala> df.show
2017-09-05 11:23:09-0700  info [TDRelation] Fetching www_access within time range:[1412327520,1412327580) - (TDRelation.scala:113)
2017-09-05 11:23:10-0700  info [TDRelation] Retrieved 1 PlazmaAPI entries - (TDRelation.scala:116)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 152.150.61.167|  /item/cameras/2043|/search/?c=Camera...| 200|Mozilla/5.0 (Wind...|  98|   GET|1412327571|
|null| 176.216.78.209|/item/electronics...|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 127|   GET|1412327554|
|null|168.180.177.180|/category/electro...|                   -| 200|Mozilla/5.0 (comp...|  81|   GET|1412327538|
|null| 124.189.68.209|  /category/software|    /item/books/4977| 200|Mozilla/4.0 (comp...|  71|   GET|1412327522|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Python

In PySpark, you need to use spark.read.format("com.treasuredata.spark").load("(database).(table)") function to read tables. It is highly recommended to apply a filter condition for time ranges so as not to load large volumes of data at once:

# Set the defautl time zone to UTC
>>> zone = sc._jvm.java.util.TimeZone
>>> zone.setDefault(sc._jvm.java.util.TimeZone.getTimeZone("UTC"))

>>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access").filter("time >= unix_timestamp('2014-10-03 09:12:00') and time < unix_timestamp('2014-10-03 09:13:00')")
>>> df.show(10)
2017-09-05T18:48:39.916Z  info [TDRelation] Fetching www_access within time range:[1412327520,1412327580) - (TDRelation.scala:113)
2017-09-05T18:48:40.282Z  info [TDRelation] Retrieved 1 PlazmaAPI entries - (TDRelation.scala:116)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 152.150.61.167|  /item/cameras/2043|/search/?c=Camera...| 200|Mozilla/5.0 (Wind...|  98|   GET|1412327571|
|null| 176.216.78.209|/item/electronics...|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 127|   GET|1412327554|
|null|168.180.177.180|/category/electro...|                   -| 200|Mozilla/5.0 (comp...|  81|   GET|1412327538|
|null| 124.189.68.209|  /category/software|    /item/books/4977| 200|Mozilla/4.0 (comp...|  71|   GET|1412327522|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Q: How to specify time ranges when reading DataFrame?

Setting a time range to read is quite imporant to reduce the data size to be transferred. When reading a table as a DataFrame, you can specify the time range with two parameters from (inclusive) and to (exclusive): td.df("(database).(table)", from = "...", to = "...").

The following date time formats are supported in from and to arguments:

  • yyyy-MM-dd HH:mm:ss zzz (e.g., 2017-01-01 00:00:00 UTC, 2017-01-01 00:00:00 America/Los_Angeles, 2017-01-01 00:00:00Z, etc.)
  • yyyy-MM-dd (e.g., 2016-12-01. UTC timezone will be used)
  • unix time (e.g., 1412327520. Unix time is the number of seconds since 1970-01-01 00:00:00Z)

Scala

scala> import com.treasuredata.spark._; val td = spark.td;
import com.treasuredata.spark._

scala> val df = td.df("sample_datasets.www_access", from="2014-10-03 09:12:00 UTC", to="2014-10-03 09:13:00 UTC")
df: org.apache.spark.sql.DataFrame = [user: bigint, host: string ... 7 more fields]

scala> df.show
2017-09-05 11:23:09-0700  info [TDRelation] Fetching www_access within time range:[1412327520,1412327580) - (TDRelation.scala:113)
2017-09-05 11:23:10-0700  info [TDRelation] Retrieved 1 PlazmaAPI entries - (TDRelation.scala:116)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 152.150.61.167|  /item/cameras/2043|/search/?c=Camera...| 200|Mozilla/5.0 (Wind...|  98|   GET|1412327571|
|null| 176.216.78.209|/item/electronics...|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 127|   GET|1412327554|
|null|168.180.177.180|/category/electro...|                   -| 200|Mozilla/5.0 (comp...|  81|   GET|1412327538|
|null| 124.189.68.209|  /category/software|    /item/books/4977| 200|Mozilla/4.0 (comp...|  71|   GET|1412327522|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Python

# Set the defautl time zone to UTC
>>> zone = sc._jvm.java.util.TimeZone
>>> zone.setDefault(sc._jvm.java.util.TimeZone.getTimeZone("UTC"))

>>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access").filter("time >= unix_timestamp('2014-10-03 09:12:00') and time < unix_timestamp('2014-10-03 09:13:00')")
>>> df.show(10)
2017-09-05T18:48:39.916Z  info [TDRelation] Fetching www_access within time range:[1412327520,1412327580) - (TDRelation.scala:113)
2017-09-05T18:48:40.282Z  info [TDRelation] Retrieved 1 PlazmaAPI entries - (TDRelation.scala:116)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 152.150.61.167|  /item/cameras/2043|/search/?c=Camera...| 200|Mozilla/5.0 (Wind...|  98|   GET|1412327571|
|null| 176.216.78.209|/item/electronics...|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 127|   GET|1412327554|
|null|168.180.177.180|/category/electro...|                   -| 200|Mozilla/5.0 (comp...|  81|   GET|1412327538|
|null| 124.189.68.209|  /category/software|    /item/books/4977| 200|Mozilla/4.0 (comp...|  71|   GET|1412327522|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Q: How can I submit Presto/Hive queries from Spark?

Scala

Use td.presto (or td.hive) command:

scala> val df = td.presto("select * from www_access limit 10")
df: org.apache.spark.sql.DataFrame = [user: string, host: string, path: string, referer: string, code: bigint, agent: string, size: bigint, method: string, time: bigint]

scala> df.show
2016-07-11 16:47:20-0700  info [TDJobRelation]
Submitted presto job 496102:
select * from www_access limit 10
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|  76.45.175.151|   /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137|   GET|1412380793|
|null|184.162.105.153|   /category/finance|                   -| 200|Mozilla/4.0 (comp...|  68|   GET|1412380784|
|null|  144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136|   GET|1412380775|
|null|  68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...|  98|   GET|1412380766|
|null| 104.66.194.210|     /category/books|                   -| 200|Mozilla/4.0 (comp...|  43|   GET|1412380757|
|null|    64.99.74.69|  /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...|  86|   GET|1412380748|
|null| 136.135.51.168|/item/networking/540|                   -| 200|Mozilla/5.0 (Wind...|  89|   GET|1412380739|
|null|   52.99.134.55|   /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...|  51|   GET|1412380730|
|null|  136.51.116.68|/category/finance...|                   -| 200|Mozilla/5.0 (comp...|  99|   GET|1412380721|
|null|136.141.218.177| /item/computers/959|                   -| 200|Mozilla/5.0 (Wind...| 124|   GET|1412380712|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Python

In PySpark, you need to use sql option to submit a Presto query:

>>> df = spark.read.format("com.treasuredata.spark").options(sql="select * from www_access limit 10").load("sample_datasets")
2017-09-06 11:28:48-0700 debug [package] Loading com.treasuredata.spark package  - (package.scala:13)
2017-09-06T11:28:49.016-0700  info [LifeCycleManager] [session:11c9de2] Life cycle is starting ... - (LifeCycleManager.scala:118)
2017-09-06T11:28:49.017-0700  info [LifeCycleManager] [session:11c9de2] ======= STARTED ======= - (LifeCycleManager.scala:122)
2017-09-06T11:28:50.479-0700  info [TDSparkContext]  - (TDSparkContext.scala:78)
Submitted job 171955106:
select * from www_access limit 10
>>> df.show(10)
2017-09-06T11:29:00.097-0700  info [TDJobRelation] Reading job 171955106 result - (TDJobRelation.scala:132)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|  116.93.24.135|     /item/toys/3282|/search/?c=Jewelr...| 200|Mozilla/5.0 (comp...| 111|   GET|1412359189|
|null| 40.186.149.189|    /category/health|    /item/books/3216| 200|Mozilla/5.0 (Wind...|  48|   GET|1412359176|
|null|  228.195.75.58|    /item/games/2968|                   -| 200|Mozilla/4.0 (comp...|  86|   GET|1412359162|
|null| 100.165.58.161|/item/computers/4496|/category/electro...| 200|Mozilla/4.0 (comp...|  89|   GET|1412359149|
|null| 72.210.153.145|/category/softwar...|   /category/jewelry| 200|Mozilla/5.0 (Wind...| 131|   GET|1412359135|
|null|  204.81.147.97|/category/electro...|    /item/games/3149| 200|Mozilla/5.0 (Wind...|  57|   GET|1412359121|
|null|104.108.157.135|/search/?c=Electr...|                   -| 200|Mozilla/4.0 (comp...|  75|  POST|1412359108|
|null|136.111.175.179|     /category/games|                   -| 200|Mozilla/5.0 (comp...| 125|   GET|1412359094|
|null|156.216.152.193|     /category/games|                   -| 200|Mozilla/4.0 (comp...|  77|   GET|1412359081|
|null|   40.189.20.95|/item/electronics...|/category/electro...| 200|Mozilla/5.0 (Wind...|  73|   GET|1412359067|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

To use Hive, you need to specify option engine="hive" as well.

Q: How can I use Spark SQL?

Although there would be no practical benefit in using Spark SQL because you can use Presto for most of the cases, you can still use Spark SQL using createOrReplaceTempView function:

# Register DataFrame as a temporary table
scala> import com.treasuredata.spark._; val td = spark.td;
2017-09-05 12:55:43-0700 debug [package] Loading com.treasuredata.spark package  - (package.scala:13)
import com.treasuredata.spark._
td: com.treasuredata.spark.TDSparkContext = TDSparkContext(local-1504641337677,spark.td.plazma_api.host -> api-plazma.treasuredata.com, spark.td.api.host -> api.treasuredata.com)

scala> td.df("sample_datasets.nasdaq").createOrReplaceTempView("nasdaq")
2017-09-05 12:56:06-0700  info [LifeCycleManager] [session:1a48a582] Life cycle is starting ... - (LifeCycleManager.scala:118)
2017-09-05 12:56:06-0700  info [LifeCycleManager] [session:1a48a582] ======= STARTED ======= - (LifeCycleManager.scala:122)

scala> val q = spark.sql("select * from nasdaq limit 10")
q: org.apache.spark.sql.DataFrame = [symbol: string, open: double ... 5 more fields]

scala> q.show
2017-09-05 12:56:50-0700  info [TDRelation] Fetching nasdaq within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:113)
2017-09-05 12:56:51-0700  info [TDRelation] Retrieved 50 PlazmaAPI entries - (TDRelation.scala:116)
+------+----+------+-------+-------+-------+---------+
|symbol|open|volume|   high|    low|  close|     time|
+------+----+------+-------+-------+-------+---------+
|  CHCO| 0.0|  3200| 9.8997| 9.8997| 9.8997|662659200|
|  BOBE| 0.0|129500|10.6875|10.3125|10.5938|662659200|
|  CINF| 0.0|  5295| 8.1086| 8.0128| 8.0368|662659200|
|  BSET| 0.0|  9801|17.7422|17.0752|17.7422|662659200|
|  BOOM| 0.0|  1000| 1.0313| 1.0313| 1.0313|662659200|
|  BMTC| 0.0| 23200| 1.9688| 1.8438| 1.8438|662659200|
|  CKEC| 0.0|  2200|   45.4| 41.995|   45.4|662659200|
|  CNMD| 0.0| 25599| 3.4867| 3.3383| 3.4125|662659200|
|  CHFC| 0.0|  1501| 8.5682| 7.9382| 8.5682|662659200|
|  COHR| 0.0| 66600|    4.5|  4.125|    4.5|662659200|
+------+----+------+-------+-------+-------+---------+   

Q: How can I read TD job resutls?

Use td.jobResult(jobId):

# Submit a query
scala> val job = td.submitJob("select 10")
2017-09-05 13:02:06-0700  info [TDSparkContext]  - (TDSparkContext.scala:78)
Submitted job 171612415:
select 10
job: com.treasuredata.spark.TDJobRelation = TDJobRelation(TDSparkContext(local-1504641337677,spark.td.plazma_api.host -> api-plazma.treasuredata.com, spark.td.api.host -> api.treasuredata.com),171612415)

scala> val result = td.jobResult(job.jobId)
result: org.apache.spark.sql.DataFrame = [_col0: bigint]

scala> result.show
2017-09-05 13:02:58-0700  info [TDJobRelation] Reading job 171612415 result - (TDJobRelation.scala:132)
+-----+
|_col0|
+-----+
|   10|
+-----+

In PySpark, you can use load("job_id:xxxxxx") function to read a job result:

df = sqlContext.read.format("com.treasuredata.spark").load("job_id:171612415")

Last modified: Sep 06 2017 18:34:43 UTC

If this article is incorrect or outdated, or omits critical information, let us know. For all other issues, access our support channels.