# Databricks での TD Python Spark ドライバ Treasure Data は、td-pyspark ドライバで使用される Plazma Public API の新規ユーザーを受け付けなくなりました。代わりに統合用に [pytd ライブラリ](https://api-docs.treasuredata.com/en/tools/pytd/quickstart/) を使用してください。 td-pyspark を使用して、Databrick でのデータ操作の結果と Treasure Data のデータをブリッジできます。 Databricks は Apache Spark の上に構築されており、Spark にアクセスするための使いやすいインターフェースを提供します。PySpark は Spark 用の Python API です。Treasure Data の [td-pyspark](https://pypi.org/project/td-pyspark/) は、td-spark に基づいて PySpark と Treasure Data を使用する便利な方法を提供する Python ライブラリです。 ## 前提条件 この例の手順に従うには、以下のアイテムが必要です: * Treasure Data API キー * td-spark 機能が有効 ## Databricks 環境の設定 クラスタを作成し、td-pyspark ライブラリをインストールし、接続コード用のノートブックを設定します。 ### Databricks でのクラスタの作成 1. Cluster アイコンを選択します。 2. **Create Cluster** を選択します。 3. クラスタ名を指定し、**Databricks Runtime Version** として Spark 2.4.3 以降のバージョンを選択し、**Python Version** として 3 を選択します。 ![](/assets/image1.3ce73e62942f1880b3459942644bd983d9878889e8521488b81e64119261fe0b.60bcc915.png) ### td-pyspark ライブラリのインストール 追加情報と最新のダウンロードについては、Treasure Data Apache Spark Driver Release Notes にアクセスするか、以下のリンクを選択してください。 1. ダウンロードを選択します * [td-spark-assembly-latest_spark2.4.7.jar](https://td-spark.s3.amazonaws.com/td-spark-assembly-latest_spark2.4.7.jar) (Spark 2.4.7、Scala 2.11) * [td-spark-assembly-latest_spark3.0.1.jar](https://td-spark.s3.amazonaws.com/td-spark-assembly-latest_spark3.0.1.jar) (Spark 3.0.1、Scala 2.12) 1. PyPI を選択します。 ![](/assets/image2.647afd229c47d4405e59ecb8623bff5fdc5649570e12e69390a7f1d718f19db6.60bcc915.png) ダウンロードが完了すると、次のように表示されます: ![](/assets/image3.020caa5ed781bcc7f9d2285ddaf581fb05443fcc4c63972de9e250fcc5702e6a.60bcc915.png) ### TD API キーとサイトの指定 Spark 設定で、Treasure Data API キーを指定し、環境変数を入力します。 ![](/assets/image4.113acee2e5ea1af141dfeff9f21be666c63c136063f9b110f8723e1e4951cd38.60bcc915.png) 形式の例は次のとおりです。実際の値を指定してください: ```python spark.td.apikey (TD API KEY) spark.td.site (サイト: us、jp、eu01、ap02) spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.execution.arrow.enabled true ``` ### クラスタを再起動して Databricks で作業を開始 Spark クラスタを再起動します。ノートブックを作成します。次のコードのようなスクリプトを作成します: ```python %python from pyspark.sql import * import td_pyspark SAMPLE_TABLE = "sample_datasets.www_access" td = td_pyspark.TDSparkContext(spark) df = td.table(SAMPLE_TABLE).within("-10y").df() df.show() ``` *TDSparkContext* は、td_pyspark の機能にアクセスするためのエントリポイントです。前述のコードサンプルに示されているように、TDSparkContext を作成するには、SparkSession (spark) を TDSparkContext に渡します: ```python td = TDSparkContext(spark) ``` 次のような結果が表示されます: ![](/assets/image5.db3371232b119ee3a0a177fa29e4e07deb8524e4da329f53e8d9085321ca3442.60bcc915.png) 接続が機能しています。 ## Databricks から Treasure Data とのインタラクション Databricks では、Treasure Data に対して select および insert クエリを実行したり、Treasure Data からデータをクエリバックしたりできます。また、データベースとテーブルを作成および削除することもできます。 Databricks では次のコマンドを使用できます: #### **テーブルを DataFrame として読み込み** テーブルを読み込むには、td.table (table_name) を使用します: ```pyton df = td.table("sample_datasets.www_access").df() df.show() ``` #### **Treasure Data で使用するデータベースの変更** コンテキストデータベースを変更するには、td.use (database_name) を使用します: ```python td.use("sample_datasets") # sample_datasets.www_access へのアクセス df = td.table("www_access").df() ``` .df() を呼び出すことで、テーブルデータが Spark の DataFrame として読み込まれます。DataFrame の使用法は PySpark と同じです。[PySpark DataFrame ドキュメント](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.md) も参照してください。 #### **sample_datasets.www_access へのアクセス** ```python df = td.table("www_access").df() ``` #### **Presto クエリの送信** Spark クラスタが小さい場合、すべてのデータをインメモリ DataFrame として読み込むのは困難な場合があります。この場合、分散 SQL クエリエンジンである Presto を使用して、PySpark で処理するデータ量を減らすことができます。 ```python q = td.presto("select code, * from sample_datasets.www_access") q.show()q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")q.show() ``` 次のように表示されます: ![](/assets/image6.c82e5076cc37c73e25ba59a89c8747e01a573350bd4081208515046f1217216f.60bcc915.png) ![](/assets/image7.858584a0970d59cdcc6ce9ee5f034a853256537e5b205785ce9f8cf40e76538b.60bcc915.png) #### **データベースの作成または削除** ```python td.create_database_if_not_exists("") td.drop_database_if_exists("") ``` #### **DataFrame の Treasure Data へのアップロード** ローカル DataFrame をテーブルとして保存するには、2つのオプションがあります: * 入力 DataFrame のレコードをターゲットテーブルに挿入 * 入力 DataFrame の内容でターゲットテーブルを作成または置換 ```python td.insert_into(df, "mydb.tbl1")td.create_or_replace(df, "mydb.tbl2") ``` ## Treasure Data での Databricks の確認 td toolbelt を使用してコマンドラインからデータベースを確認できます。また、TD Console をお持ちの場合は、データベースとクエリを確認できます。[データベースとテーブル管理](/products/customer-data-platform/data-workbench/databases/data-management) をお読みください。