You can send data from Kafka to Treasure Data, using the Fluentd consumer for Kafka. Other than the consumer itself, and depending on your current setup, there may be a few additional requirements.

This document assumes Ubuntu Precise Debian; commands should otherwise be chosen according to the operating system.

This feature is in Beta. For more information, contact support@treasuredata.com.

Prerequisites

  • Basic knowledge of Treasure Data

  • Basic knowledge of Apache Kafka setup, configuration, and architecture including producers and consumers

  • Ability to build with Gradle

  • Working knowledge of Linux

  • Working knowledge of Fluentd

  • A working Kafka single-node or multi-broker installation

Install and configure Fluentd

Preinstall

Review the pre-setup and installation instructions for installing Fluentd. Ensure the following before setting up fluentd:

  • Installed ntp server and set up proper restrict values in /etc/ntp.conf

  • Added the local clock as NTP backup and set also logging parameters

  • Restarted the NTP service daemon

  • Configured the correct number of file descriptors in /etc/security/limits.conf


Install Fluentd

For Trusty:

$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh

Refer to Treasure Agent (td-agent) installation for other distributions.

Edit Fluentd.conf

$ sudo nano /etc/td-agent/td-agent.conf

You will be using the Fluentd Consumer for Kafka to forward your events (fluentd receives events from default port of 24224) so your configuration file should look similar to the following:

<source>
  type forward
</source>

<match td.*.*>
  type tdlog
  apikey "#{ENV['TD_API_KEY']}"

  auto_create_table
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 5s

  <secondary>
    type file
    path /var/log/td-agent/failed_records
  </secondary>
</match>

Set your Treasure Data Write-Only API as an environment variable on your machine. Then, you might want to restart Fluentd:

$ /etc/init.d/td-agent restart

Installing and Configuring Kafka Consumer for Fluentd

Download the JAR File

Download pre-compiled JAR file from the GitHub releases page.

https://github.com/treasure-data/kafka-fluentd-consumer/releases

The jar file name is kafka-fluentd-consumer-x.y.z-all.jar.

Download Configuration Files

You also must download two configuration files from GitHub.

$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties
  • fluentd-consumer.properties: Use to set Kafka consumer and log forwarding parameters

  • log4j.properties: Use to set logging parameters of consumer itself.

Here is file list example of kafka-fluentd-consumer:

$ ls
kafka-fluentd-consumer-0.2.4-all.jar    fluentd-consumer.properties    log4j.properties

Upgrade Java on Host

Make sure you are running at least Java version 1.7:

$ java -version

If not, complete the following steps:

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default

Run Kafka Fluentd Consumer

Start Zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Create a Test Topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Send Multiple Messages in JSON Format

The Kafka Fluentd Consumer requires messages to be sent in JSON format.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}

Confirm the Messages by Starting a Consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

You should see:

{"a": 1}
{"a": 1, "b": 2}

Configure and Run the Kafka Fluentd Consumer

Modify fluentd-consumer.properties with an appropriate configuration. Change the topic name in the following to your intended Treasure Data table name. fluentd.consumer.topics=test.

Run the command to start the consumer. You may need to adjust this command line to account for the paths to the relevant .jar and .properties files.

$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties

You can go back and send a few more events from your producer, as long as they are in valid JSON format.

Query your Data on Treasure Data

Run the following query on TD Console:

select * from test

Under Query Result, you can see if the run was successful.

  • No labels