You can send data from Kafka to Treasure Data, using the Fluentd consumer for Kafka. Other than the consumer itself, and depending on your current setup, there may be a few additional requirements.

This document assumes Ubuntu Precise Debian; commands should otherwise be chosen according to the operating system.
This feature is in Beta. For more information, contact support@treasuredata.com.
- Basic knowledge of Treasure Data
- Basic knowledge of Apache Kafka setup, configuration, and architecture including producers and consumers
- Ability to build with Gradle
- Working knowledge of Linux
- Working knowledge of Fluentd
- A working Kafka single-node or multi-broker installation
Review the pre-setup and installation instructions for installing Fluentd. Ensure the following before setting up fluentd:
- Installed ntp server and set up proper restrict values in /etc/ntp.conf
- Added the local clock as NTP backup and set also logging parameters
- Restarted the NTP service daemon
- Configured the correct number of file descriptors in /etc/security/limits.conf
For Trusty:
$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | shRefer to Treasure Agent (td-agent) installation for other distributions.
$ sudo nano /etc/td-agent/td-agent.confYou will be using the Fluentd Consumer for Kafka to forward your events (fluentd receives events from default port of 24224) so your configuration file should look similar to the following:
source
type forward
</source>
<match td.*.*>
type tdlog
apikey "#{ENV['TD_API_KEY']}"
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
flush_interval 5s
secondary
type file
path /var/log/td-agent/failed_records
</secondary>
</match>Set your Treasure Data Write-Only API as an environment variable on your machine. Then, you might want to restart Fluentd:
$ /etc/init.d/td-agent restartDownload pre-compiled JAR file from the GitHub releases page.
https://github.com/treasure-data/kafka-fluentd-consumer/releases
The jar file name is kafka-fluentd-consumer-x.y.z-all.jar.
You also must download two configuration files from GitHub.
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties- fluentd-consumer.properties: Use to set Kafka consumer and log forwarding parameters
- log4j.properties: Use to set logging parameters of consumer itself.
Here is file list example of kafka-fluentd-consumer:
$ ls
kafka-fluentd-consumer-0.2.4-all.jar fluentd-consumer.properties log4j.propertiesMake sure you are running at least Java version 1.7:
$ java -versionIf not, complete the following steps:
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default$ bin/zookeeper-server-start.sh config/zookeeper.properties$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testThe Kafka Fluentd Consumer requires messages to be sent in JSON format.
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningYou should see:
{"a": 1}
{"a": 1, "b": 2}Modify fluentd-consumer.properties with an appropriate configuration. Change the topic name in the following to your intended Treasure Data table name. fluentd.consumer.topics=test.
Run the command to start the consumer. You may need to adjust this command line to account for the paths to the relevant .jar and .properties files.
$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.propertiesYou can go back and send a few more events from your producer, as long as they are in valid JSON format.
Run the following query on TD Console:
select * from testUnder Query Result, you can see if the run was successful.