Skip to content
Last updated

Amazon EMR での TD Python Spark ドライバ

Treasure Data は、Spark ドライバで使用される Plazma Public API の新規ユーザーを受け付けなくなりました。代わりに pytd ライブラリ を使用してください。

td-pyspark を使用して、Amazon EMR (Elastic MapReduce) でのデータ操作の結果と Treasure Data のデータをブリッジできます。

Amazon EMR は、ビッグデータ処理と分析のための AWS ツールであり、Spark にアクセスするための使いやすいインターフェースを提供します。PySpark は Spark 用の Python API です。Treasure Data の td-pyspark は、td-spark に基づいて PySpark と Treasure Data を使用する便利な方法を提供する Python ライブラリです。

前提条件

この例の手順に従うには、以下の Treasure Data アイテムが必要です:

  • Treasure Data API キー

  • td-spark 機能が有効

Amazon EMR 環境の設定

キーペア、クラスタを作成し、td-pyspark ライブラリをインストールし、接続コード用のノートブックを設定します。

Amazon Management Service にログインします。Find Services で EMR を入力します。Amazon EMR クラスタノードは Amazon EC2 インスタンスで実行されます。

EC2 キーペアの作成

Amazon でキーペアを作成すると、名前を指定し、拡張子 .pem のファイルが生成されます。生成されたファイルをローカルコンピュータにダウンロードします。

Amazon EC2 キーペアの作成の詳細については、Amazon EC2 Key Pairs を参照してください。

クラスタを作成するときにキーを参照します。すべてのクラスタインスタンスへの SSH 接続に使用される Amazon EC2 キーペアを指定します。

Amazon EMR でのクラスタの作成

設定フィールドに入力します。クラスタ名、クラスタデータのフォルダの場所を指定し、Application として Spark 2.4.3 以降のバージョンを選択します。

インスタンスとキーペアを指定します。

SSH での EMR マスターノードへの接続

ローカルコンピュータからのインバウンドアクセス権限を付与するには、セキュアプロトコルを指定します。

Master のセキュリティグループを選択します:

Amazon で、アクセスしたいマスターノードを見つけます:

プロキシが確立されたら、マスターノードインスタンスにログオンします。ローカルコンピュータで、ssh プロトコルを介して AWS マスターノードインスタンスにアクセスします。

$ ssh -i ~/ <your_aws_keypair_kry.pem> hadoop@ <Master public DNS>

ローカルコンピュータに接続確認が表示されます。

td-pyspark ライブラリのインストール

追加情報と最新のダウンロードについては、Treasure Data Apache Spark Driver Release Notes にアクセスするか、以下のリンクを選択してください。

EMR インスタンス内で、マスターノードにライブラリをダウンロードすることを選択します。

マスターノードインスタンス内で、次のコマンドを実行して pyspark をインストールします:

$ pip install td_pyspark

設定ファイルの作成と TD API キーおよびサイトの指定

マスターノードインスタンスで、td-spark.conf ファイルを作成します。設定ファイルで、TD API キー、TD サイトパラメータ、および Spark 環境を指定します。

形式の例は次のとおりです。実際の値を指定してください:

spark.td.apikey (TD API KEY)

spark.td.site (サイト: us、jp、eu01、ap02)

spark.serializer org.apache.spark.serializer.KryoSerializer

spark.sql.execution.arrow.enabled true

PySpark の起動

PySpark を起動します。コマンドには、次の例に示すように引数を含めます:

% pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf

次のようなものが表示されます:

次に、td_pyspark をロードします。プロンプト記号が >>> に変わることに注意してください:

>>> import td_pyspark

>>> from pyspark import SparkContext

>>> from pyspark.sql import SparkSession

>>> builder = SparkSession.builder.appName("td-pyspark-test")

>>> td = td_pyspark.TDSparkContextBuilder(builder).build()

>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()

>>> df.show()

TDSparkContextBuilder は、td_pyspark の機能にアクセスするためのエントリポイントです。前述のコードサンプルに示されているように、Treasure Data のテーブルをデータフレームとして読み込みます:

df = td.table("tablename").df()

次のような結果が表示されます:

接続が機能しています。

マスターノードインスタンスでの Treasure Data とのインタラクション

Treasure Data に対して select および insert クエリを実行したり、Treasure Data からデータをクエリバックしたりできます。また、データベースとテーブルを作成および削除することもできます。

次のコマンドを使用できます:

テーブルを DataFrame として読み込み

テーブルを読み込むには、td.table (table_name) を使用します:

df = td.table("sample_datasets.www_access").df()

df.show()

Treasure Data で使用するデータベースの変更

コンテキストデータベースを変更するには、td.use (database_name) を使用します:

td.use("sample_datasets")

sample_datasets.www_access へのアクセス

df = td.table("www_access").df()

.df() を呼び出すことで、テーブルデータが Spark の DataFrame として読み込まれます。DataFrame の使用法は PySpark と同じです。PySpark DataFrame ドキュメント も参照してください。

sample_datasets.www_access へのアクセス

df = td.table("www_access").df()

Presto クエリの送信

Spark クラスタが小さい場合、すべてのデータをインメモリ DataFrame として読み込むのは困難な場合があります。この場合、分散 SQL クエリエンジンである Presto を使用して、PySpark で処理するデータ量を減らすことができます。

q = td.presto("select code, * from sample_datasets.www_access")

q.show()

q = td.presto("select code, count(*) from sample_datasets.www_access group by 1")

q.show()

出力例:

データベースの作成または削除

td.create_database_if_not_exists("<db_name>")

td.drop_database_if_exists("<db_name>")

DataFrame の Treasure Data へのアップロード

ローカル DataFrame をテーブルとして保存するには、2つのオプションがあります:

  • 入力 DataFrame のレコードをターゲットテーブルに挿入

  • 入力 DataFrame の内容でターゲットテーブルを作成または置換

td.insert_into(df, "mydb.tbl1")
td.create_or_replace(df, "mydb.tbl2")

TD toolbelt を使用してコマンドラインからデータベースを確認できます。また、TD Console をお持ちの場合は、データベースとクエリを確認できます。データベースとテーブル管理 をお読みください。