# Kafka Import Integration And Fluentd Consumer You can send data from Kafka to Treasure Data, using the Fluentd consumer for Kafka. Other than the consumer itself, and depending on your current setup, there may be a few additional requirements. ![](/assets/2020-kafka-import-integration.4fbf2e5d70d6e900644e1ca016b7bb1c74df7f0af9dbb3192095f881859d7d3f.0af5a2b9.png) This document assumes Ubuntu Precise Debian; commands should otherwise be chosen according to the operating system. This feature is in Beta. For more information, contact [support@treasuredata.com](mailto:support@treasuredata.com). # Prerequisites - Basic knowledge of Treasure Data - Basic knowledge of [Apache Kafka](http://kafka.apache.org/documentation.md#quickstart) setup, configuration, and architecture including producers and consumers - Ability to build with Gradle - Working knowledge of Linux - Working knowledge of [Fluentd](http://www.fluentd.org/) - A working Kafka single-node or multi-broker installation # Install and configure Fluentd ## Preinstall Review the pre-setup and installation instructions for [installing Fluentd](https://docs.fluentd.org/installation). Ensure the following before setting up fluentd: - Installed ntp server and set up proper restrict values in /etc/ntp.conf - Added the local clock as NTP backup and set also logging parameters - Restarted the NTP service daemon - Configured the correct number of file descriptors in /etc/security/limits.conf ## Install Fluentd For Trusty: ``` $ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh ``` Refer to Treasure Agent [(td-agent)](https://docs.treasuredata.com/pages/viewpage.action?pageId=331108) installation for other distributions. ## Edit Fluentd.conf ``` $ sudo nano /etc/td-agent/td-agent.conf ``` You will be using the Fluentd Consumer for Kafka to forward your events (fluentd receives events from default port of 24224) so your configuration file should look similar to the following: ``` source type forward type tdlog apikey "#{ENV['TD_API_KEY']}" auto_create_table buffer_type file buffer_path /var/log/td-agent/buffer/td flush_interval 5s secondary type file path /var/log/td-agent/failed_records ``` Set your Treasure Data Write-Only API as an environment variable on your machine. Then, you might want to restart Fluentd: ``` $ /etc/init.d/td-agent restart ``` # Installing and Configuring Kafka Consumer for Fluentd ## Download the JAR File Download pre-compiled JAR file from the GitHub releases page. [https://github.com/treasure-data/kafka-fluentd-consumer/releases](https://github.com/treasure-data/kafka-fluentd-consumer/releases) The jar file name is `kafka-fluentd-consumer-x.y.z-all.jar`. ## Download Configuration Files You also must download two configuration files from GitHub. ``` $ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties $ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties ``` - fluentd-consumer.properties: Use to set Kafka consumer and log forwarding parameters - log4j.properties: Use to set logging parameters of consumer itself. Here is file list example of kafka-fluentd-consumer: ``` $ ls kafka-fluentd-consumer-0.2.4-all.jar fluentd-consumer.properties log4j.properties ``` ## Upgrade Java on Host Make sure you are running at least Java version 1.7: ``` $ java -version ``` If not, complete the following steps: ``` $ sudo add-apt-repository ppa:webupd8team/java $ sudo apt-get update $ sudo apt-get install oracle-java7-installer $ sudo apt-get install oracle-java7-set-default ``` # Run Kafka Fluentd Consumer ## Start Zookeeper ``` $ bin/zookeeper-server-start.sh config/zookeeper.properties ``` ## Create a Test Topic ``` $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ``` ## Send Multiple Messages in JSON Format The Kafka Fluentd Consumer requires messages to be sent in JSON format. ``` $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test {"a": 1} {"a": 1, "b": 2} ``` ## Confirm the Messages by Starting a Consumer ``` $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning ``` You should see: ``` {"a": 1} {"a": 1, "b": 2} ``` ## Configure and Run the Kafka Fluentd Consumer Modify fluentd-consumer.properties with an appropriate configuration. Change the topic name in the following to your intended Treasure Data table name. fluentd.consumer.topics=test. Run the command to start the consumer. You may need to adjust this command line to account for the paths to the relevant .jar and .properties files. ``` $ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties ``` You can go back and send a few more events from your producer, as long as they are in valid JSON format. ## Query your Data on Treasure Data Run the following query on TD Console: ``` select * from test ``` Under Query Result, you can see if the run was successful.