Skip to content
Last updated

Databricks での TD Python Spark ドライバ

Treasure Data は、td-pyspark ドライバで使用される Plazma Public API の新規ユーザーを受け付けなくなりました。代わりに統合用に pytd ライブラリ を使用してください。

td-pyspark を使用して、Databrick でのデータ操作の結果と Treasure Data のデータをブリッジできます。

Databricks は Apache Spark の上に構築されており、Spark にアクセスするための使いやすいインターフェースを提供します。PySpark は Spark 用の Python API です。Treasure Data の td-pyspark は、td-spark に基づいて PySpark と Treasure Data を使用する便利な方法を提供する Python ライブラリです。

前提条件

この例の手順に従うには、以下のアイテムが必要です:

  • Treasure Data API キー

  • td-spark 機能が有効

Databricks 環境の設定

クラスタを作成し、td-pyspark ライブラリをインストールし、接続コード用のノートブックを設定します。

Databricks でのクラスタの作成

  1. Cluster アイコンを選択します。

  2. Create Cluster を選択します。

  3. クラスタ名を指定し、Databricks Runtime Version として Spark 2.4.3 以降のバージョンを選択し、Python Version として 3 を選択します。

td-pyspark ライブラリのインストール

追加情報と最新のダウンロードについては、Treasure Data Apache Spark Driver Release Notes にアクセスするか、以下のリンクを選択してください。

  1. ダウンロードを選択します
  1. PyPI を選択します。

ダウンロードが完了すると、次のように表示されます:

TD API キーとサイトの指定

Spark 設定で、Treasure Data API キーを指定し、環境変数を入力します。

形式の例は次のとおりです。実際の値を指定してください:

spark.td.apikey (TD API KEY)
spark.td.site (サイト: us、jp、eu01、ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled true

クラスタを再起動して Databricks で作業を開始

Spark クラスタを再起動します。ノートブックを作成します。次のコードのようなスクリプトを作成します:

%python

from pyspark.sql import *

import td_pyspark

SAMPLE_TABLE = "sample_datasets.www_access"

td = td_pyspark.TDSparkContext(spark)

df = td.table(SAMPLE_TABLE).within("-10y").df()

df.show()

TDSparkContext は、td_pyspark の機能にアクセスするためのエントリポイントです。前述のコードサンプルに示されているように、TDSparkContext を作成するには、SparkSession (spark) を TDSparkContext に渡します:

td = TDSparkContext(spark)

次のような結果が表示されます:

接続が機能しています。

Databricks から Treasure Data とのインタラクション

Databricks では、Treasure Data に対して select および insert クエリを実行したり、Treasure Data からデータをクエリバックしたりできます。また、データベースとテーブルを作成および削除することもできます。

Databricks では次のコマンドを使用できます:

テーブルを DataFrame として読み込み

テーブルを読み込むには、td.table (table_name) を使用します:

df = td.table("sample_datasets.www_access").df()

df.show()

Treasure Data で使用するデータベースの変更

コンテキストデータベースを変更するには、td.use (database_name) を使用します:

td.use("sample_datasets")

# sample_datasets.www_access へのアクセス

df = td.table("www_access").df()

.df() を呼び出すことで、テーブルデータが Spark の DataFrame として読み込まれます。DataFrame の使用法は PySpark と同じです。PySpark DataFrame ドキュメント も参照してください。

sample_datasets.www_access へのアクセス

df = td.table("www_access").df()

Presto クエリの送信

Spark クラスタが小さい場合、すべてのデータをインメモリ DataFrame として読み込むのは困難な場合があります。この場合、分散 SQL クエリエンジンである Presto を使用して、PySpark で処理するデータ量を減らすことができます。

q = td.presto("select code, * from sample_datasets.www_access")

q.show()q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")q.show()

次のように表示されます:

データベースの作成または削除

td.create_database_if_not_exists("<db_name>")

td.drop_database_if_exists("<db_name>")

DataFrame の Treasure Data へのアップロード

ローカル DataFrame をテーブルとして保存するには、2つのオプションがあります:

  • 入力 DataFrame のレコードをターゲットテーブルに挿入

  • 入力 DataFrame の内容でターゲットテーブルを作成または置換

td.insert_into(df, "mydb.tbl1")td.create_or_replace(df, "mydb.tbl2")

Treasure Data での Databricks の確認

td toolbelt を使用してコマンドラインからデータベースを確認できます。また、TD Console をお持ちの場合は、データベースとクエリを確認できます。データベースとテーブル管理 をお読みください。