Fluentd Consumer for Kafka

Kafka Fluentd Consumer

This article will help you start sending data from Kafka to Treasure Data, using the Fluentd consumer for Kafka. Other than the consumer itself, and depending on your current setup, there may be a few additional requirements.

This document assumes Ubuntu Precise Debian; commands should otherwise be chosen according to the operating system.

Table of Contents


Currently this feature is in Beta. For more information, please contact support@treasuredata.com.
  • Basic knowledge of Treasure Data
  • Basic knowledge of Apache Kafka setup, configuration and architecture, including producers and consumers
  • Ability to build with Gradle
  • Working knowledge of Linux
  • Working knowledge of Fluentd
  • A working Kafka single-node or multi-broker installation

Step 0: Install and configure Fluentd


You can refer to pre-setup and installation instructions for installing Fluentd here. To summarize, ensure the following before setting up fluentd:

  • you’ve installed ntp server and set up proper restrict values in /etc/ntp.conf;
  • you’ve added the local clock as NTP backup and set also logging parameters;
  • you’ve restarted the ntp service daemon;
  • you’ve configured the correct number of file descriptors in /etc/security/limits.conf;

Install Fluentd

For Trusty:

$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

Refer td-agent installation article for other distributions.

Edit Fluentd.conf

$ sudo nano /etc/td-agent/td-agent.conf

Since you will be using the Fluentd Consumer for Kafka to forward your events (and fluentd will receive them from default port of 24224) your configuration file should look similar to the following:

  type forward

<match td.*.*>
  type tdlog
  apikey "#{ENV['TD_API_KEY']}"

  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 5s

    type file
    path /var/log/td-agent/failed_records

Remember to set your Treasure Data Write-Only API as an environment variable on your machine. Once all this is done, you may want to restart fluentd:

$ /etc/init.d/td-agent restart

Step 1: Installing and Configuring Kafka Consumer for Fluentd

Download jar file

Download pre-compiled jar file from github releases page.


jar file name is kafka-fluentd-consumer-x.y.z-all.jar.

Download configuration files

You also need to download two configuration files from github.

$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties
  • fluentd-consumer.properties: For setting kafka consumer and log forwarding parameters
  • log4j.properties: For setting logging parameters of consumer itself.

Here is file list example of kafka-fluentd-consumer:

$ ls
kafka-fluentd-consumer-0.2.4-all.jar    fluentd-consumer.properties    log4j.properties

Upgrade Java on Host

Make sure you are running at least Java version 1.7:

$ java -version

If not, please take the following steps:

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default

Step 2: Run Zookeeper and Kafka, Create a topic, send JSON messages, and run Kafka Fluentd Consumer

Start zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Create a test topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Send multiple messages in JSON format

Note that Kafka Fluentd Consumer requires messages to be sent in JSON format.

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}

Confirm the messages by starting a consumer

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

You should see:

{"a": 1}
{"a": 1, "b": 2}

Now, Configure and run the Kafka Fluentd Consumer

First, modify fluentd-consumer.properties with an appropriate configuration. Don’t forget to change fluentd.consumer.topics=test. (or rather, change the topic name accordingly to your intended Treasure Data table name).

Finally, run the command to start the consumer. Note that you may need to adjust this command line to account for the paths to the relevant .jar and .properties files.

$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties

You can go back and send a few more events from your producer, making sure they are valid JSON.

Step 3: Query your data on Treasure Data

Run the following query on Treasure Data console:

select * from test

If everthing is working, you should see something like this:

Kafka Data Console

Last modified: Feb 24 2017 09:27:52 UTC

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.