With the td-spark driver you can access data sets in Treasure Data using Apache Spark.

To enable the Treasure Data Spark Driver (td-spark) to connect to Treasure Data, contact support.

AWS region: use spark.td.site to launch your Spark cluster.

For the fastest data access and lowest data transfer costs, we recommend that you set up your spark cluster in the appropriate regions. Data Transfer costs can become quite high if you use other AWS regions or processing environments other than what is local to the service.

AWS regions to use for the best performance:

  • US region customers: AWS us-east-1 region.

  • Tokyo region customers: AWS ap-northeast-1 region.

  • Korea region customers: AWS ap-northeast-2 region

  • EU region customers: AWS eu-central-1 region.


td-spark.conf

Use the td-spark configuration file to set up your Spark applications (specifically, use the --properties-file td-spark.conf option).

Setting the spark.td.site is required for Tokyo, Korea, and EU region customers.

Set spark.td.site to us, jp or eu01 according to your account region

# region to use. [us, jp, ap02, or eu01]
spark.td.site us
spark.td.apikey (your TD API key)
spark.serializer org.apache.spark.serializer.KryoSerializer

Download

The driver is available for download from these links. If it is within your corporate policy, you can select the links to download the drivers directly. 

Release Notes

v20.10.0

Downloads

The driver is available for download from these links. If it is within your corporate policy, you can select the links to download the drivers directly. 

Changes

  • Upgrade to Spark 2.4.7, Spark 3.0.1

  • Internal library version upgrade:

    • Upgrade jackson to 2.10.5

    • Upgrade json4s to 3.6.6

    • Upgrade fluency to 2.4.1

    • Upgrade presto-jdbc version to 338 to fix the performance issue using with JDK11

    • Upgrade Airframe to 20.10.0

    • Upgrade to Scala 2.11.12, Scala 2.12.12

    • Upgrade td-client-java to 0.9.3

Bug Fixes

  • A bug fix that caused an upload failure of DataFrame if it contains a time column whose type is not Long.

  • A bug fix when reading Map type values inside a column.

  • A bug fix to the partition reader to reflect spark.sql.maxPartitionBytes and spark.sql.files.openCostInBytes configuration parameters. This will reduce the number of necessary Spark tasks by packing multiple partition read tasks into a single task. See also Spark SQL Performance Tuning Guide.

v20.6.2

Downloads

Changes

A bug fix for properly handling HTTP responses when receiving 5xx errors from APIs.

v20.6.1

Downloads

Changes

This release supports Spark 2.4.6 and Spark 3.0.0 (official release).

v20.6.0

Downloads

Changes

  • Support swapping table contents

Bug Fixes

  • Bump to msgpack-java 0.8.20 with JDK8 compatibility

  • Fixed NPE in reading specific Array column values

  • Handle 504 responses properly

v20.4.0

Downloads

Changes

  • Spark 2.4.5 support

  • Support ap02 for spark.td.site configuration

v20.2.0

Downloads

Changes

  • Spark 3.0.0-preview2 support

v19.11.1

Downloads

Bug Fixes

  • Fixed a bug in uploading DataFrame whose time column contains null or non unixtime values.

  • Fixed an error when installing td_pyspark using Python 2

v19.11.0

Downloads

  • td-spark-assembly-19.11.0_spark2.4.4.jar (Spark 2.4.4, Scala 2.11)

  • td-spark-assembly-19.11.0_spark3.0.0-preview.jar (Spark 3.0.0-preview, Scala 2.12)

  • Commands for running spark-shell with Docker:

    • Spark 2.4.4: docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-shell:19.11.0_spark2.4.4

    • Spark 3.0.0-preview: docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-shell:19.11.0_spark3.0.0-preview

    • PySpark 2.4.4: docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-pyspark:19.11.0_spark2.4.4

    • PySpark 3.0.0.dev0: docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-pyspark:19.11.0_spark3.0.0-preview

Major Changes

  • Support Spark 2.4.4 (Scala 2.11) and Spark 3.0.0-preview (Scala 2.12, pyspark 3.0.0.dev0)

  • Support using multiple TD accounts with val td2 = td.withApiKey("...") (Scala), td2 = td.with_apikey("...") (Python).

Bug Fixes

  • Fixed the table preview of array column values inserted from td-spark

Internal Changes

v19.7.0

Downloads

Major Changes

Bug fixes

  • Fixed scala-parser-combinator error when using td.presto(sql).

  • Bump to Fluency 2.3.2 with configuration fix

  • Add retry around drop table/database

v1.2.0

Downloads

  • Scala 2.11: td-spark-aseembly_2.11-1.2.0.jar for Spark 2.4.3 and Scala 2.11

  • From this version, we will collect basic usage metrics to provide better support for td-spark users:

    • td-spark function usage and performance statistics

    • exception reports

Other Updates

Features

  • Support Spark 2.4.3

    • td-spark Docker images now accept TD_SPARK_CONF environment variable to pass a td-spark configuration file (e.g., spark.td.apikey, spark.td.site, etc.).

    • Bug fixes:

      • Fixed an error when uploading DataFrame with uppercase column names.

      • Fixed a bug that creates corrupted partitions when uploading DataFrame to UDP tables.

      • Fixed td-spark Docker images to support changing spark.td.site configuration.

v1.1.0

Downloads

  • Scala 2.11: td-spark-assembly_2.11-1.1.0.jar (recommended because Spark 2.4.x is built for Scala 2.11)

  • Reduced the td-spark assembly jar file size from 100MB to 22MB.

  • Add new methods

    • td.presto(sql)

      • Improved the query performance by using the api-presto gateway.

      • If you need to run a query that has large results, use td.prestoJob(sql).df.

    • td.executePresto(sql)

      • Run non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc.

    • td.prestoJob(sql)

      • Run Presto query as a regular TD job

    • td.hiveJob(sql)

      • Run Hive query as a regular TD job

    • New Database/Table methods:

      • td.table(...).exists

      • td.table(...).dropIfExists

      • td.table(...).createIfNotExists

      • td.database(...).exists

      • td.database(...).dropIfExists

      • td.database(...).createIfNotExists

    • Add methods for creating new UDP tables:

      • td.createLongPartitionedTable(table_name, long type column name)

      • td.createStringPartitionedTable(table_name, string type column name)

      • Support df.createOrReplaceTD(...) for UDP tables

  • Add spark.td.site configuration for multiple regions.

    • spark.td.site=us (default)

      • For US customers. Using us-east region provides the best performance

    • spark.td.site=jp

      • For Tokyo region customers. Using ap-northeast region provides the best performance.

    • spark.td.site=eu01

      • For EU region customers. Using eu-central region provides the best performance.

  • Enabled predicate pushdown for UDP table queries

    • Queries that specify conditions for UDP keys can be accelerated.

  • Bug fixes:

    • Fixed a bug when using createOrReplaceTempView in multiple notebooks at Databricks cloud.

    • Fixed an error that showed NoSuchMethod when using td.presto command

v1.0.0

The first major release with Save Mode, PySpark support.

  • This version supports only Spark 2.4.x or higher.

    • We will no longer support Spark 2.3.x now that Spark 2.4.x is widely available. For users who still needs to use Spark 2.3.x, use td-spark 0.4.2.

Download

  • Scala 2.11: td-spark-assembly_2.11-1.0.0.jar

    • Scala 2.12: td-spark-assembly_2.12-1.0.0.jar

  • Demo

Changes

  • Fixed data type conversion errors when reading Presto/Hive query results.

    • Fixed TD table schema column mapping when saving Long, Float DataFrame type columns.

    • Removed the warning message when using slf4j and td-client-java.

v0.5.2 (For Spark 2.4.x)

A hotfix release for supporting PySpark

Dec. 18, 2018

Downloads

  • Scala 2.11: td-spark-assembly_2.11-0.5.2.jar

  • Scala 2.12: td-spark-assembly_2.12-0.5.2.jar

v0.4.2 (For Spark 2.3.x)

A hotfix release for supporting PySpark

Dec. 18. 2018

Download:

  • Scala 2.11: td-spark-assembly_2.11-0.4.2.jar

v0.5.1 (For Spark 2.4.x)

Dec 18, 2018

  • Fixes NoSuchMethodError when using AWS EMR

  • Optimized the record reader memory footprint

  • Download:

    • Scala 2.11: td-spark-assembly_2.11-0.5.1.jar

    • Scala 2.12: td-spark-assembly_2.12-0.5.1.jar


v0.4.1 (For Spark 2.3.x)

Dec 18, 2018

  • Fixes NoSuchMethodError when using AWS EMR

    • Download: td-spark-assembly_2.11-0.4.1.jar

v0.5.0

Dec 17, 2018

Upgrade to Spark 2.4.0.

  • Note: Spark 2.3.2 and 2.4.0 are binary incompatible, so for Spark 2.3.2 users, td-spark-0.4.0 must be used. To use Spark 2.4.0, td-spark 0.5.0 must be used.

  • Support Scala 2.12

  • Jar file download links:

    • Scala 2.11: td-spark-assembly_2.11-0.5.0.jar

    • Scala 2.12: td-spark-assembly_2.12-0.5.0.jar

v0.4.0

Dec 17, 2018

Upgrade to Spark 2.3.2

  • Download: td-spark-assembly_2.11-0.4.0.jar

  • Support uploading DataFrames to TD:

    • df.write.td("(database).(table)")

    • df.createOrReplaceTD("(database).(table)")

    • df.insertIntoTD("(database).(table)")

    • df.write.mode("...").format("com.treasuredata.spark").option("table", "(database).(table)").save

  • Support more flexible time range selector. Examples:

    • val t = td.table("(database).(table)")

    • duration string

      • t.within("-1d")

    • unixtime:

      • t.withinUnixTimeRange(from = 1412320845, until = 1412321000)

      • t.fromUnixTime(...)

      • t.untilUnixTime(...)

    • time strings

      • t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00")

      • t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo"))

      • t.fromTime("2014-10-03 09:12:00")

      • t.untilTime("2014-10-03 09:13:00")

  • Improved the table scan stability

    • Allow using more tasks for scanning tables

    • Optimized table partition reads

    • Add request retry on network failures

    • Use local disks for large table download

0.3.4

June 11, 2018

Upgrade to Spark 2.3.0

  • Support expiration_sec option, e.g., td.table("...").withExpierationSec(7200)

  • Support time window string like td.table("...").within("-7d")

  • Support td.table("...").fromUnixTime(...)/untilUnixTime(...).

0.3.2

Oct 31, 2017

  • Show td-spark version.

v0.3.1

Oct 5, 2017

  • Fixes a bug when reading tables containing null values.

v0.3

July 14, 2017

  • Support full-time range format, e.g., td.df("(database).(table)", from = "2017-07-14 01:00:00 PDT", to="2017-07-15 02:00:00 PDT")

  • Optimized DataFrame reader by using memory-efficient UnsafeRow class

  • Improved the performance and memory consumption when reading a single table schema

  • Fixed a bug that showed null values when reading columns that have alias names

v0.2

Jan 25, 2017

  • Upgraded to Spark 2.1.0

  • Rename the Guava library package inside the assembly JAR

  • Upgrade to Airframe 0.9, wvlet-log 1.1

v0.1

Nov 17, 2016

  • Private alpha release

  • Added basic functionality for using TD dataset inside Spark

    • Reading mpc1 files

    • listing databases and table schemas

    • Run Presto/Hive queries

    • Support for SparkSQL

    • PySpark support

  • No labels