Skip to content
Last updated

Amazon Kinesis Import Integration

Amazon Kinesisは、AWSでストリーミングデータを扱うためのプラットフォームで、ストリーミングデータの読み込みと分析、さらにカスタムストリーミングデータアプリケーションを構築する機能を提供します。

この記事では、AWS Lambdaを使用してAmazon Kinesis StreamからTreasure Dataにデータを取り込む方法について説明します。

前提条件

  • Treasure Dataの基本的な知識
  • Amazon Kinesisの基本的な知識

Master API keyの取得

TD Consoleからmaster keyを取得できます。

AWS Lambda functionの設定

AWS Lambdaは、データ取り込みパイプラインの一部です。AWS Lambdaを使用することで、Amazon Kinesisからのトリガーに応答してコードを実行できます。

Blueprintの選択

  1. kinesis-process-record-pythonを選択します。

Event Sourcesの設定

  1. Event source typeとしてKinesisを選択します。
  2. Kinesis StreamとしてストリームのNameを指定します。

Functionの設定

  1. Name、Descriptionを指定します。
  2. RuntimeとしてPython 3.xを選択します。

Lambda_handler Python Scriptの設定

KinesisのストリーミングイベントはPython function lambda_handler.pyによって処理され、TreasureDataはTreasure Boxesのソリューションの1つとして、Kinesis Firehoseデータストリームイベントをインポートするサンプルスクリプトを提供しています。

Treasure BoxesリンクからPythonスクリプトをコピー&ペーストします。READMEファイルに実行手順の詳細が記載されています。

Functionの確認

設定を確認します。

Create functionを選択します。

Functionのテスト

設定後、Treasure Boxesに記載されているように、以下のイベントサンプルでfunctionをテストできます。

Lambda UIから1回限りのテストを行うには、以下のレコードを使用できます(Actions > Configure test event)。

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
  "region": "us-west-2",
  "records": [
    {
      "recordId": "49546986683135544286507457936321625675700192471156785154",
      "approximateArrivalTimestamp": 1495072949453,
      "data": "eyJmb28iOiAiYmFyIn0="
    }
  ]
}

recordsのdataの部分は、base64エンコードされた{"foo": bar}に解決されます。

データアップロードの確認

データが正常にインポートされたことを確認するには:

  1. TD Console > Databasesに移動します。

データの取り込みが完了するまで1〜3分かかります。