Skip to content
Last updated

Amazon Elastic MapReduce

Amazon Elastic MapReduce (EMR)でApache Spark Driver for Treasure Data(td-sparkとも呼ばれます)を使用できます。最適なパフォーマンスを得るためにAmazon EC2のus-eastリージョンの使用をお勧めしますが、他のSpark環境でもtd-sparkを使用できます。

概要

td-sparkの概要については、TD Spark FAQsを参照してください。

td-sparkで何ができるか?

  • ScalaとPython (PySpark)のSparkからTreasure Dataにアクセスします。
  • Treasure DataテーブルをSparkにDataFrameとして取り込みます(TDクエリは発行されず、TD保存データとSparkクラスター間の最短のレイテンシパスを提供します)。
  • Presto、Hive、またはSparkSQLクエリを発行し、結果をSpark DataFrameとして返します。

このドライバは、Spark 2.4.4以降での使用が推奨されます

使用に関する推奨事項

最速のデータアクセスと最低のデータ転送コストのために、AWSのus-eastリージョンでSparkクラスターをセットアップすることをお勧めします。他のAWSリージョンまたは処理環境を使用すると、データ転送コストが非常に高くなる可能性があります。

EMR上のTD Spark Driver

EMR Sparkクラスターの作成

  1. Sparkサポート付きのEMRクラスターを作成します。

    S3からのデータ転送パフォーマンスを最大化するために、us-eastリージョンが推奨されます。

  2. 新しいEMRのマスターノードアドレスを確認します。

    テーブルデータをSpark DataFrameとして読み取ります

デフォルトのセキュリティグループ(ElasticMapReduce-master)でEMRを作成した場合は、環境からのインバウンドアクセスを許可する必要があります。Amazon EMR-Managed Security Groupsを参照してください。

リファレンス

EMRクラスターへのログイン

Connect to EMR Master node with SSH

# SOCKS5プロキシポート8157を使用して、EMR Sparkジョブ履歴ページ(ポート18080)、Zeppelinノートブック(ポート8890)などにアクセスできるようにします。
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com
     __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2016.09-release-notes/
4 package(s) needed for security, out of 6 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

TD Spark Integrationのセットアップ

td-spark jarファイルをダウンロードします:

[hadoop@ip-x-x-x-x]$ wget https://s3.amazonaws.com/td-spark/td-spark-assembly_2.11-0.4.0.jar

マスターノードにtd.confファイルを作成します:

# ここにTD APIキーを記述します
spark.td.apikey (your TD API key)
spark.td.site (your site name: us, jp, ap02, eu01, etc.)
# (推奨) より高速なパフォーマンスのためにKryoSerializerを使用
spark.serializer org.apache.spark.serializer.KryoSerializer

EMRでspark-shellを使用

[hadoop@ip-x-x-x-x]$ spark-shell --master yarn --jars td-spark-assembly-latest.jar --properties-file td.conf
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
scala> import com.treasuredata.spark._
scala> val td = spark.td
scala> val d = td.table("sample_datasets.www_access").df
scala> d.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|136.162.131.221|    /category/health|   /category/cameras| 200|Mozilla/5.0 (Wind...|  77|   GET|1412373596|
|null| 172.33.129.134|      /category/toys|   /item/office/4216| 200|Mozilla/5.0 (comp...| 115|   GET|1412373585|
|null| 220.192.77.135|  /category/software|                   -| 200|Mozilla/5.0 (comp...| 116|   GET|1412373574|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 3 rows

EMRでZeppelin Notebookを使用

td-spark用にZeppelinを設定

  1. EMRクラスターへのSSHトンネルを作成します。
$ ssh -i (your AWS key pair file. .pem) -D8157 hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com

(Chromeユーザー向け) Proxy Switchy Sharp Chrome Extensionをインストールします。EMRマスターにアクセスする際にEMRのproxy-switchをオンにします 2. http://(your EMR master node public address):8890/を開きます 3. td-sparkを設定するためにInterpretersページを開きます。

4. プロファイルの詳細を編集し、Saveを選択します。

TD内のデータセットにDataFrameとしてアクセス

テーブルデータをSpark DataFrameとして読み取ることができます。

Prestoクエリの実行

Spark History Serverの確認

EMRマスターノードのパブリックアドレスを使用して、History Serverでイベント履歴を確認できます。

http://(your EMR master node public address):18080/

PySparkとSparkSQLでのTD Spark Driverの使用

PySpark

$ ./bin/pyspark  --driver-class-path ~/work/git/td-spark/td-spark/target/td-spark-assembly-0.1-SNAPSHOT.jar --properties-file ../td-dev.conf

>>> df = spark.read.format("com.treasuredata.spark").load("sample_datasets.www_access")
>>> df.show(10)
2016-07-19 16:34:15-0700  info [TDRelation] Fetching www_access within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-19 16:34:16-0700  info [TDRelation] Retrieved 19 PlazmaAPI entries - (TDRelation.scala:85)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 148.165.90.106|/category/electro...|     /category/music| 200|Mozilla/4.0 (comp...|  66|   GET|1412333993|
|null|  144.105.77.66|/item/electronics...| /category/computers| 200|Mozilla/5.0 (iPad...| 135|   GET|1412333977|
|null| 108.54.178.116|/category/electro...|  /category/software| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333961|
|null|104.129.105.202|/item/electronics...|     /item/games/394| 200|Mozilla/5.0 (comp...|  83|   GET|1412333945|
|null|   208.48.26.63|  /item/software/706|/search/?c=Softwa...| 200|Mozilla/5.0 (comp...|  76|   GET|1412333930|
|null|  108.78.209.95|/item/giftcards/4879|      /item/toys/197| 200|Mozilla/5.0 (Wind...| 137|   GET|1412333914|
|null| 108.198.97.206|/item/computers/4785|                   -| 200|Mozilla/5.0 (Wind...|  69|   GET|1412333898|
|null| 172.195.185.46|     /category/games|     /category/games| 200|Mozilla/5.0 (Maci...|  41|   GET|1412333882|
|null|   88.24.72.177|/item/giftcards/4410|                   -| 200|Mozilla/4.0 (comp...|  72|   GET|1412333866|
|null|  24.129.141.79|/category/electro...|/category/networking| 200|Mozilla/5.0 (comp...|  73|   GET|1412333850|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 10 rows

## Prestoジョブの送信
>>> df = spark.read.format("com.treasuredata.spark").options(sql="select 1").load("sample_datasets")
2016-07-19 16:56:56-0700  info [TDSparkContext]
Submitted job 515990:
select 1 - (TDSparkContext.scala:70)
>>> df.show(10)
+-----+
|_col0|
+-----+
|    1|
+-----+

## ジョブ結果の読み取り
>>> df = sqlContext.read.format("com.treasuredata.spark").load("job_id:515990")

SparkSQL

# DataFrameを一時テーブルとして登録
scala> td.df("hb_tiny.rankings").createOrReplaceTempView("rankings")

scala> val q1 = spark.sql("select page_url, page_rank from rankings where page_rank > 100")
q1: org.apache.spark.sql.DataFrame = [page_url: string, page_rank: bigint]

scala> q1.show
2016-07-20 11:27:11-0700  info [TDRelation] Fetching rankings within time range:[-9223372036854775808,9223372036854775807) - (TDRelation.scala:82)
2016-07-20 11:27:12-0700  info [TDRelation] Retrieved 2 PlazmaAPI entries - (TDRelation.scala:85)
+--------------------+---------+
|            page_url|page_rank|
+--------------------+---------+
|xjhmjsuqolfklbvxn...|      251|
|seozvzwkcfgnfuzfd...|      165|
|fdgvmwbrjlmvuoquy...|      132|
|gqghyyardomubrfsv...|      108|
|qtqntqkvqioouwfuj...|      278|
|wrwgqnhxviqnaacnc...|      135|
|  cxdmunpixtrqnvglnt|      146|
| ixgiosdefdnhrzqomnf|      126|
|xybwfjcuhauxiopfi...|      112|
|ecfuzdmqkvqktydvi...|      237|
|dagtwwybivyiuxmkh...|      177|
|emucailxlqlqazqru...|      134|
|nzaxnvjaqxapdjnzb...|      119|
|       ffygkvsklpmup|      332|
|hnapejzsgqrzxdswz...|      171|
|rvbyrwhzgfqvzqkus...|      148|
|knwlhzmcyolhaccqr...|      104|
|nbizrgdziebsaecse...|      665|
|jakofwkgdcxmaaqph...|      187|
|kvhuvcjzcudugtidf...|      120|
+--------------------+---------+
only showing top 20 rows