Treasure Data は、Spark ドライバで使用される Plazma Public API の新規ユーザーを受け付けなくなりました。代わりに pytd ライブラリ を使用してください。
td-pyspark を使用して、Amazon EMR (Elastic MapReduce) でのデータ操作の結果と Treasure Data のデータをブリッジできます。
Amazon EMR は、ビッグデータ処理と分析のための AWS ツールであり、Spark にアクセスするための使いやすいインターフェースを提供します。PySpark は Spark 用の Python API です。Treasure Data の td-pyspark は、td-spark に基づいて PySpark と Treasure Data を使用する便利な方法を提供する Python ライブラリです。
- Amazon EMR での TD Python Spark ドライバ (非推奨)
- 前提条件
- Amazon EMR 環境の設定
- EC2 キーペアの作成
- Amazon EMR でのクラスタの作成
- SSH での EMR マスターノードへの接続
- td-pyspark ライブラリのインストール
- 設定ファイルの作成と TD API キーおよびサイトの指定
- PySpark の起動
- マスターノードインスタンスでの Treasure Data とのインタラクション
- テーブルを DataFrame として読み込み
- Treasure Data で使用するデータベースの変更
- sample_datasets.www_access へのアクセス
- Presto クエリの送信
- 出力例:
- データベースの作成または削除
- DataFrame の Treasure Data へのアップロード
この例の手順に従うには、以下の Treasure Data アイテムが必要です:
Treasure Data API キー
td-spark 機能が有効
キーペア、クラスタを作成し、td-pyspark ライブラリをインストールし、接続コード用のノートブックを設定します。
Amazon Management Service にログインします。Find Services で EMR を入力します。Amazon EMR クラスタノードは Amazon EC2 インスタンスで実行されます。
Amazon でキーペアを作成すると、名前を指定し、拡張子 .pem のファイルが生成されます。生成されたファイルをローカルコンピュータにダウンロードします。
Amazon EC2 キーペアの作成の詳細については、Amazon EC2 Key Pairs を参照してください。
クラスタを作成するときにキーを参照します。すべてのクラスタインスタンスへの SSH 接続に使用される Amazon EC2 キーペアを指定します。
設定フィールドに入力します。クラスタ名、クラスタデータのフォルダの場所を指定し、Application として Spark 2.4.3 以降のバージョンを選択します。
インスタンスとキーペアを指定します。
ローカルコンピュータからのインバウンドアクセス権限を付与するには、セキュアプロトコルを指定します。
Master のセキュリティグループを選択します:
Amazon で、アクセスしたいマスターノードを見つけます:
プロキシが確立されたら、マスターノードインスタンスにログオンします。ローカルコンピュータで、ssh プロトコルを介して AWS マスターノードインスタンスにアクセスします。
$ ssh -i ~/ <your_aws_keypair_kry.pem> hadoop@ <Master public DNS>
ローカルコンピュータに接続確認が表示されます。
追加情報と最新のダウンロードについては、Treasure Data Apache Spark Driver Release Notes にアクセスするか、以下のリンクを選択してください。
EMR インスタンス内で、マスターノードにライブラリをダウンロードすることを選択します。
マスターノードインスタンス内で、次のコマンドを実行して pyspark をインストールします:
$ pip install td_pyspark
マスターノードインスタンスで、td-spark.conf ファイルを作成します。設定ファイルで、TD API キー、TD サイトパラメータ、および Spark 環境を指定します。
形式の例は次のとおりです。実際の値を指定してください:
spark.td.apikey (TD API KEY)
spark.td.site (サイト: us、jp、eu01、ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled true
PySpark を起動します。コマンドには、次の例に示すように引数を含めます:
% pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf
次のようなものが表示されます:
次に、td_pyspark をロードします。プロンプト記号が >>> に変わることに注意してください:
>>> import td_pyspark
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("td-pyspark-test")
>>> td = td_pyspark.TDSparkContextBuilder(builder).build()
>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()
>>> df.show()TDSparkContextBuilder は、td_pyspark の機能にアクセスするためのエントリポイントです。前述のコードサンプルに示されているように、Treasure Data のテーブルをデータフレームとして読み込みます:
df = td.table("tablename").df()
次のような結果が表示されます:
接続が機能しています。
Treasure Data に対して select および insert クエリを実行したり、Treasure Data からデータをクエリバックしたりできます。また、データベースとテーブルを作成および削除することもできます。
次のコマンドを使用できます:
テーブルを読み込むには、td.table (table_name) を使用します:
df = td.table("sample_datasets.www_access").df()
df.show()
コンテキストデータベースを変更するには、td.use (database_name) を使用します:
td.use("sample_datasets")
df = td.table("www_access").df()
.df() を呼び出すことで、テーブルデータが Spark の DataFrame として読み込まれます。DataFrame の使用法は PySpark と同じです。PySpark DataFrame ドキュメント も参照してください。
df = td.table("www_access").df()
Spark クラスタが小さい場合、すべてのデータをインメモリ DataFrame として読み込むのは困難な場合があります。この場合、分散 SQL クエリエンジンである Presto を使用して、PySpark で処理するデータ量を減らすことができます。
q = td.presto("select code, * from sample_datasets.www_access")
q.show()
q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")
q.show()td.create_database_if_not_exists("<db_name>")
td.drop_database_if_exists("<db_name>")ローカル DataFrame をテーブルとして保存するには、2つのオプションがあります:
入力 DataFrame のレコードをターゲットテーブルに挿入
入力 DataFrame の内容でターゲットテーブルを作成または置換
td.insert_into(df, "mydb.tbl1")
td.create_or_replace(df, "mydb.tbl2")TD toolbelt を使用してコマンドラインからデータベースを確認できます。また、TD Console をお持ちの場合は、データベースとクエリを確認できます。データベースとテーブル管理 をお読みください。