Amazon Kinesisは、AWSでストリーミングデータを扱うためのプラットフォームで、ストリーミングデータの読み込みと分析、さらにカスタムストリーミングデータアプリケーションを構築する機能を提供します。
この記事では、AWS Lambdaを使用してAmazon Kinesis StreamからTreasure Dataにデータを取り込む方法について説明します。
- Treasure Dataの基本的な知識
- Amazon Kinesisの基本的な知識
TD Consoleからmaster keyを取得できます。
AWS Lambdaは、データ取り込みパイプラインの一部です。AWS Lambdaを使用することで、Amazon Kinesisからのトリガーに応答してコードを実行できます。
- kinesis-process-record-pythonを選択します。

- Event source typeとしてKinesisを選択します。
- Kinesis StreamとしてストリームのNameを指定します。

- Name、Descriptionを指定します。
- RuntimeとしてPython 3.xを選択します。

KinesisのストリーミングイベントはPython function lambda_handler.pyによって処理され、TreasureDataはTreasure Boxesのソリューションの1つとして、Kinesis Firehoseデータストリームイベントをインポートするサンプルスクリプトを提供しています。
Treasure BoxesリンクからPythonスクリプトをコピー&ペーストします。READMEファイルに実行手順の詳細が記載されています。
設定を確認します。
Create functionを選択します。

設定後、Treasure Boxesに記載されているように、以下のイベントサンプルでfunctionをテストできます。
Lambda UIから1回限りのテストを行うには、以下のレコードを使用できます(Actions > Configure test event)。
{
"invocationId": "invocationIdExample",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"region": "us-west-2",
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "eyJmb28iOiAiYmFyIn0="
}
]
}recordsのdataの部分は、base64エンコードされた{"foo": bar}に解決されます。
データが正常にインポートされたことを確認するには:
- TD Console > Databasesに移動します。
データの取り込みが完了するまで1〜3分かかります。