Data Ingestion from Amazon Kinesis

Amazon Kinesis is a platform for streaming data on AWS, offering powerful services to make it easy to load and analyze streaming data, and also providing the ability for you to build custom streaming data applications for specialized needs.

This article explains how to ingest data from Amazon Kinesis Stream to Treasure Data, by using AWS Lambda.

Table of Contents

Prerequisites

  • Basic knowledge of Treasure Data. The Quickstart Guide is a good place to start.
  • Basic knowledge of Amazon Kinesis

Step 1: Retrieve Master API key

First, please retrieve Treasure Data’s Master API key. The Master API key can be retrieved from the console (click here).

Step 2: Set up AWS Lambda function

In this article, we’ll use AWS Lambda as a part of the data ingestion pipeline. By using AWS Lambda, you can execute code in response to triggers from Amazon Kinesis. Let’s create the lambda function.

Step 2.1: Select blueprint

First, please select kinesis-process-record-python blueprint as a basis of our configuration.


Step 2.2: Configure event sources

Then, please select Kinesis as Event source type, and specify the name of stream as Kinesis Stream.


Step 2.3: Configure function

Next, let’s set up the actual function. Please specify Name, Description, and choose Python 2.7 as Runtime.


Then, please copy & paste the function below, and specify td_database, td_table, and td_master_key variables.

import urllib, urllib2, base64, time, json, sys
import dateutil, dateutil.parser, datetime

td_database = '<YOUR_DATABASE_NAME>'
td_table = '<YOUR_TABLE_NAME>'
td_master_key = '<YOUR_MASTER_API_KEY>'
td_endpoint = 'https://in.treasuredata.com/js/v3/event'

def upload_td(records):
    # https://docs.treasuredata.com/articles/javascript-sdk#appendix-api-endpoint
    headers = {
        'Content-Type': 'application/json',
        'X-TD-Data-Type': 'k',
        'X-TD-Write-Key': td_master_key,
    }
    data = json.dumps({ '%s.%s' % (td_database, td_table): records })
    req = urllib2.Request(td_endpoint, data=data, headers=headers)
    response = urllib2.urlopen(req, timeout=180)
    response.read()
    print("Success: %s records" % len(records))

def lambda_handler(event, context):
    records = []
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        # We also assume payload comes as JSON form
        payload = json.loads(base64.b64decode(record['kinesis']['data']))
        if (not 'time'in payload):
            payload['time'] = int(time.time())
        elif isinstance(payload['time'], basestring):
            payload['time'] = int((dateutil.parser.parse(payload['time']).replace(tzinfo=dateutil.tz.tzutc()) - datetime.datetime.utcfromtimestamp(0).replace(tzinfo=dateutil.tz.tzutc())).total_seconds())
        records.append(payload)
    upload_td(records)

Step 2.4: Review the function

Finally, please review your configuration and hit Create function button.


Step 2.5: Test the function

Once it’s configured, you can test the function. You can use the records below for the one-time testing from Lambda UI (Actions –> Configure test event).

{
  "Records": [
    {
      "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
      "eventVersion": "1.0",
      "kinesis": {
        "partitionKey": "partitionKey-3",
        "data": "eyJmaWVsZDEiOiAyLCAiZmllbGQyIjogImIifQ==",
        "kinesisSchemaVersion": "1.0",
        "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
      },
      "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
      "eventName": "aws:kinesis:record",
      "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
      "eventSource": "aws:kinesis",
      "awsRegion": "us-east-1"
    },
    {
      "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
      "eventVersion": "1.0",
      "kinesis": {
        "partitionKey": "partitionKey-3",
        "data": "eyJmaWVsZDEiOiAyLCAiZmllbGQyIjogImIifQ==",
        "kinesisSchemaVersion": "1.0",
        "sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
      },
      "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
      "eventName": "aws:kinesis:record",
      "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
      "eventSource": "aws:kinesis",
      "awsRegion": "us-east-1"
    }
  ]
}

This comes with a record body of {"field1": 2, "field2": "b"}, with base64 encoding.

Step 3: Confirm data upload

Finally, please visit the Databases page at Web console, to check if the data got imported successfully. It needs 1-3 minutes for the data to be ingested.

Related Contents


Last modified: Apr 08 2016 01:25:43 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.