# Amazon EMR での TD Python Spark ドライバ Treasure Data は、Spark ドライバで使用される Plazma Public API の新規ユーザーを受け付けなくなりました。代わりに [pytd ライブラリ](https://api-docs.treasuredata.com/en/tools/pytd/quickstart/) を使用してください。 td-pyspark を使用して、Amazon EMR (Elastic MapReduce) でのデータ操作の結果と Treasure Data のデータをブリッジできます。 Amazon EMR は、ビッグデータ処理と分析のための AWS ツールであり、Spark にアクセスするための使いやすいインターフェースを提供します。PySpark は Spark 用の Python API です。Treasure Data の [td-pyspark](https://pypi.org/project/td-pyspark/) は、[td-spark](https://docs.treasuredata.com/display/PD/Treasure+Data+Apache+Spark+Driver+Release+Notes) に基づいて PySpark と Treasure Data を使用する便利な方法を提供する Python ライブラリです。 * [Amazon EMR での TD Python Spark ドライバ (非推奨)](#td-python-spark-driver-with-amazon-emr) * [前提条件](#prerequisites) * [Amazon EMR 環境の設定](#configuring-your-amazon-emr-environment) * [EC2 キーペアの作成](#create-an-ec2-key-pair) * [Amazon EMR でのクラスタの作成](#create-a-cluster-on-amazon-emr) * [SSH での EMR マスターノードへの接続](#connect-to-emr-master-node-with-ssh) * [td-pyspark ライブラリのインストール](#install-the-td-pyspark-libraries) * [設定ファイルの作成と TD API キーおよびサイトの指定](#create-a-configuration-file-and-specify-your-td-api-key-and-site) * [PySpark の起動](#launch-pyspark) * [マスターノードインスタンスでの Treasure Data とのインタラクション](#interacting-with-treasure-data-in-your-master-node-instance) * [テーブルを DataFrame として読み込み](#read-tables-as-dataframes) * [Treasure Data で使用するデータベースの変更](#change-the-database-used-in-treasure-data) * [sample_datasets.www_access へのアクセス](#access-sample_datasets.www_access) * [Presto クエリの送信](#submit-presto-queries) * [出力例:](#you-see) * [データベースの作成または削除](#create-or-drop-a-database) * [DataFrame の Treasure Data へのアップロード](#upload-dataframes-to-treasure-data) ## 前提条件 この例の手順に従うには、以下の Treasure Data アイテムが必要です: * Treasure Data API キー * td-spark 機能が有効 ## Amazon EMR 環境の設定 キーペア、クラスタを作成し、td-pyspark ライブラリをインストールし、接続コード用のノートブックを設定します。 Amazon Management Service にログインします。Find Services で EMR を入力します。Amazon EMR クラスタノードは Amazon EC2 インスタンスで実行されます。 ### EC2 キーペアの作成 Amazon でキーペアを作成すると、名前を指定し、拡張子 .pem のファイルが生成されます。生成されたファイルをローカルコンピュータにダウンロードします。 Amazon EC2 キーペアの作成の詳細については、[Amazon EC2 Key Pairs](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.md) を参照してください。 クラスタを作成するときにキーを参照します。すべてのクラスタインスタンスへの SSH 接続に使用される Amazon EC2 キーペアを指定します。 ## Amazon EMR でのクラスタの作成 設定フィールドに入力します。クラスタ名、クラスタデータのフォルダの場所を指定し、**Application** として Spark 2.4.3 以降のバージョンを選択します。 インスタンスとキーペアを指定します。 ## SSH での EMR マスターノードへの接続 ローカルコンピュータからのインバウンドアクセス権限を付与するには、セキュアプロトコルを指定します。 Master のセキュリティグループを選択します: Amazon で、アクセスしたいマスターノードを見つけます: プロキシが確立されたら、マスターノードインスタンスにログオンします。ローカルコンピュータで、ssh プロトコルを介して AWS マスターノードインスタンスにアクセスします。 `$ ssh -i ~/ hadoop@ ` ローカルコンピュータに接続確認が表示されます。 ## td-pyspark ライブラリのインストール 追加情報と最新のダウンロードについては、[Treasure Data Apache Spark Driver Release Notes](https://docs.treasuredata.com/display/PD/Treasure+Data+Apache+Spark+Driver+Release+Notes) にアクセスするか、以下のリンクを選択してください。 EMR インスタンス内で、マスターノードにライブラリをダウンロードすることを選択します。 * [td-spark-assembly-20.4.0_spark2.4.5.jar](https://td-spark.s3.amazonaws.com/td-spark-assembly-20.4.0_spark2.4.5.jar) マスターノードインスタンス内で、次のコマンドを実行して pyspark をインストールします: `$ pip install td_pyspark` ## 設定ファイルの作成と TD API キーおよびサイトの指定 マスターノードインスタンスで、td-spark.conf ファイルを作成します。設定ファイルで、TD API キー、TD サイトパラメータ、および Spark 環境を指定します。 形式の例は次のとおりです。実際の値を指定してください: spark.td.apikey (TD API KEY) spark.td.site (サイト: us、jp、eu01、ap02) spark.serializer org.apache.spark.serializer.KryoSerializer spark.sql.execution.arrow.enabled true ## PySpark の起動 PySpark を起動します。コマンドには、次の例に示すように引数を含めます: % pyspark --driver-class-path ./td_pyspark-19.7.0/td_pyspark/jars/td-spark-assembly.jar --properties-file ./td-spark.conf 次のようなものが表示されます: 次に、td_pyspark をロードします。プロンプト記号が >>> に変わることに注意してください: ```python >>> import td_pyspark >>> from pyspark import SparkContext >>> from pyspark.sql import SparkSession >>> builder = SparkSession.builder.appName("td-pyspark-test") >>> td = td_pyspark.TDSparkContextBuilder(builder).build() >>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df() >>> df.show() ``` *TDSparkContextBuilder* は、td_pyspark の機能にアクセスするためのエントリポイントです。前述のコードサンプルに示されているように、Treasure Data のテーブルをデータフレームとして読み込みます: `df = td.table("tablename").df()` 次のような結果が表示されます: 接続が機能しています。 # マスターノードインスタンスでの Treasure Data とのインタラクション Treasure Data に対して select および insert クエリを実行したり、Treasure Data からデータをクエリバックしたりできます。また、データベースとテーブルを作成および削除することもできます。 次のコマンドを使用できます: ## テーブルを DataFrame として読み込み テーブルを読み込むには、td.table (table_name) を使用します: df = td.table("sample_datasets.www_access").df() df.show() ## **Treasure Data で使用するデータベースの変更** コンテキストデータベースを変更するには、td.use (database_name) を使用します: td.use("sample_datasets") ## sample_datasets.www_access へのアクセス df = td.table("www_access").df() .df() を呼び出すことで、テーブルデータが Spark の DataFrame として読み込まれます。DataFrame の使用法は PySpark と同じです。[PySpark DataFrame ドキュメント](https://spark.apache.org/docs/latest/api/python/pyspark.sql.md#pyspark.sql.DataFrame) も参照してください。 ### **sample_datasets.www_access へのアクセス** `df = td.table("www_access").df()` ### **Presto クエリの送信** Spark クラスタが小さい場合、すべてのデータをインメモリ DataFrame として読み込むのは困難な場合があります。この場合、分散 SQL クエリエンジンである Presto を使用して、PySpark で処理するデータ量を減らすことができます。 ```python q = td.presto("select code, * from sample_datasets.www_access") q.show() q = td.presto("select code, count(*) from sample_datasets.www_access group by 1") q.show() ``` #### 出力例: ## **データベースの作成または削除** ```python td.create_database_if_not_exists("") td.drop_database_if_exists("") ``` ## **DataFrame の Treasure Data へのアップロード** ローカル DataFrame をテーブルとして保存するには、2つのオプションがあります: * 入力 DataFrame のレコードをターゲットテーブルに挿入 * 入力 DataFrame の内容でターゲットテーブルを作成または置換 ```python td.insert_into(df, "mydb.tbl1") td.create_or_replace(df, "mydb.tbl2") ``` TD toolbelt を使用してコマンドラインからデータベースを確認できます。また、TD Console をお持ちの場合は、データベースとクエリを確認できます。[データベースとテーブル管理](http://docs.treasuredata.com/display/PD/Database+and+Table+Management) をお読みください。