pytd.spark.download_td_spark

pytd.spark.download_td_spark(spark_binary_version='2.11'version='latest'destination=None)[source]

Download a td-spark jar file from S3.

Parameters

  • spark_binary_version (str, default: '2.11') – Apache Spark binary version.

  • version (str, default: 'latest') – td-spark version.

  • destination (str, optional) – Where a downloaded jar file to be stored.

pytd.spark.fetch_td_spark_context

pytd.spark.fetch_td_spark_context(apikey=Noneendpoint=Nonetd_spark_path=Nonedownload_if_missing=Truespark_configs=None)[source]

Build TDSparkContext via td-pyspark.

Parameters

  • apikey (str, optional) – Treasure Data API key. If not given, a value of environment variable TD_API_KEY is used by default.

  • endpoint (str, optional) – Treasure Data API server. If not given, https://api.treasuredata.com is used by default. List of available endpoints is: Sites and Endpoints

  • td_spark_path (str, optional) – Path to td-spark-assembly_x.xx-x.x.x.jar. If not given, seek a path TDSparkContextBuilder.default_jar_path() by default.

  • download_if_missing (bool, default: True) – Download td-spark if it does not exist at the time of initialization.

  • spark_configs (dict, optional) – Additional Spark configurations to be set via SparkConf’s set method.

Returns

Connection of td-spark

Return type

td_pyspark.TDSparkContext

td_pyspark.TDSparkContext

pytd.spark.fetch_td_spark_context() returns td_pyspark.TDSparkContext(). Review the following documentation. For morei information, see sample usage on Google Colab.

classtd_pyspark.TDSparkContext(sparktd=None)

Treasure Data Spark Context

__init__(sparktd=None)Parameters

  • spark (pyspark.sql.SparkSessio) – SparkSession already connected to Spark.

  • td (TDSparkContext, optional) – Treasure Data Spark Context.

df(table)

Load Treasure Data table into Spark DataFrame

Parameters

table (str) – Table name of Treasure Data.

Returns

Loaded table data.

Return type

pyspark.sql.DataFrame

presto(sqldatabase=None)

Submit Presto Query

Parameters

  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Returns

SQL result

Return type

pyspark.sql.DataFrame

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> spark = SparkSession.builder.master("local").getOrCreate()
>>> td = TDSparkContext(spark)
>>> sql = "select code, count(*) from sample_datasets.www_access group by 1"
>>> q = td.presto(sql)
>>> q.show()
2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500|   2|
| 404|  17|
+----+----+

execute_presto(sql, database=None)

Run non-query statements (e.g., INSERT INTO, CREATE TABLE)

Parameters

  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")

table(table)

Fetch TreasureData table

Parameters

table (str) – Table name

Returns

TD table data. df() method should be called to treat as spark.sql.DataFrame.

Return type

TDTable

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("sample_datasets.www_access")
<td_pyspark.td_pyspark.TDTable object at 0x10eedf240>

db(name)

Fetch TreasureData database

Parameters

name (str) – Database name

Returns

TD database data. df() method should be called to treat as spark.sql.DataFrame.

Return type

TDDatabase

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.db("sample_datasets")
<td_pyspark.td_pyspark.TDDatabase object at 0x10eedfa58>

set_log_level(log_level)

Set log level for Spark

Parameters

log_level (str) – Log level for Spark process. {“ALL”, “DEBUG”, “ERROR”, “FATAL”, “INFO”, “OFF”, “TRACE”, “WARN”}

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.set_log_level("DEBUG")
2019-09-06T18:19:12.398-0700  info [TDSparkContext] Setting the log level of com.treasuredata.spark to DEBUG - (TDSparkContext.scala:62)

use(name)

Change the current database.

Parameters

name (str) – Target database name to be changed.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.use("mydb")
2019-09-06T18:19:49.469-0700  info [TDSparkContext] Use mydb - (TDSparkContext.scala:150

with_apikey(apikey)

Set an additional apikey

Parameters

apikey (str) – apikey for TreasureData

Example

>>> from td_pyspark import TDSparkContext
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.master("local").getOrCreate()
>>> td = TDSparkContext(spark)
>>> td2 = td.with_apikey("key2")

write(df, table_name, mode=”error”)

Write a DataFrame as a TreasureData table

Parameters

  • df – Target DataFrame to be ingested to TreasureData.

  • table_name – Target table name to be inserted.

  • mode –

    Save mode same as Spark. {“error”. “overwrite”, “append”, “ignore”}

    • error: raise an exception.

    • overwrite: drop the existing table, recreate it, and insert data.

    • append: insert data. Create if does not exist.

    • ignore: do nothing.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.write(df, "mydb.table1", "error")

insert_into(df, table_name)

Insert a DataFrame into existing TreasureData table

Parameters

  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be inserted.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.insert_into(df, "mydb.table1")
2019-09-09T10:57:37.558-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Append) - (TDWriter.scala:66)
2019-09-09T10:57:38.187-0700  info [TDWriter] [txx:8184891a] Starting a new transaction for updating mydb.table1 - (TDWriter.scala:95)
2019-09-09T10:57:42.897-0700  info [TDWriter] [txx:8184891a] Finished uploading 1 partitions (1 records, size:132B) to mydb.table1 - (TDWriter.scala:132)

create_or_replace(df, table_name)

Create or replace a TreasureData table with a DataFrame.

Parameters

  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be ingested.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.create_or_replace(df, "mydb.table1")
2019-09-09T10:57:56.381-0700  warn [DefaultSource] Dropping mydb.table1 (Overwrite mode) - (DefaultSource.scala:94)
2019-09-09T10:57:56.923-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Overwrite) - (TDWriter.scala:66)
2019-09-09T10:57:57.106-0700  info [TDWriter] [txx:a69bce97] Starting a new transaction for updating aki.tds_test - (TDWriter.scala:95)
2019-09-09T10:57:59.179-0700  info [TDWriter] [txx:a69bce97] Finished uploading 1 partit

create_table_if_not_exists(table_name)

Create a table if not exists

Parameters

table_name (str) – Target table name to be created.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_table_if_not_exists(df, "mydb.table1")
2019-09-09T13:43:41.142-0700  warn [TDTable] Creating table aki.tds_test if not exists 

drop_table_if_exists(table_name)

Drop a table if exists

Parameters

table_name (str) – Target table name to be dropped.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_table_if_exists(df, "mydb.table1")

create_database_if_not_exists(db_name)

Create a database if not exits

Parameters

db_name (str) – Target database name to be created.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_database_if_not_exists(df, "mydb")

drop_database_if_exists(db_name)

Drop a database if exists

Parameters

db_name (str) – Target database name to be dropped

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_database_if_exists(df, "mydb")

create_udp_l(table_name, long_column _name)

Create an User-Defined Partition Table partitioned by Long type column.

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by Long type column.

Parameters

  • table_name (str) – Target table name to be created as a UDP table.

  • long_column_name (str) – Partition column with Long (bigint) type column

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_l("mydb.departments", "dept_id")
2019-09-09T10:43:20.913-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."departments" (
  time bigint,
  "dept_id" bigint
)
with (
  bucketed_on = array['dept_id'],
  bucket_count = 512
)

create_udp_s(table_name, string_column_name)

Create a User-Defined Partition Table partitioned by string type column

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by a string type column.

Parameters

  • table_name – Target table name to be created as a UDP table.

  • string_column_name – Partition column with a string type column

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_s("mydb.user_list", "id")
2019-09-09T10:45:27.802-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."user_list" (
  time bigint,
  "id" varchar
)
with (
  bucketed_on = array['id'],
  bucket_count = 512
)

  • No labels