# Kafka インポート連携とFluentd Consumer Kafka用のFluentd consumerを使用して、KafkaからTreasure Dataにデータを送信できます。consumer自体に加えて、現在のセットアップによっては、いくつかの追加要件がある場合があります。 ![](/assets/2020-kafka-import-integration.4fbf2e5d70d6e900644e1ca016b7bb1c74df7f0af9dbb3192095f881859d7d3f.0af5a2b9.png) このドキュメントは、Ubuntu Precise Debianを想定しています。それ以外の場合は、オペレーティングシステムに応じてコマンドを選択する必要があります。 この機能はベータ版です。詳細については、[support@treasuredata.com](mailto:support@treasuredata.com)までお問い合わせください。 # 前提条件 - Treasure Dataの基本知識 - [Apache Kafka](http://kafka.apache.org/documentation.md#quickstart)のセットアップ、設定、およびプロデューサーとコンシューマーを含むアーキテクチャに関する基本知識 - Gradleでビルドする能力 - Linuxの実用的な知識 - [Fluentd](http://www.fluentd.org/)の実用的な知識 - 動作しているKafkaシングルノードまたはマルチブローカーのインストール # Fluentdのインストールと設定 ## プレインストール [Fluentdのインストール](https://docs.fluentd.org/installation)の事前セットアップおよびインストール手順を確認してください。fluentdをセットアップする前に、以下を確認してください。 - ntpサーバーをインストールし、/etc/ntp.confに適切なrestrict値を設定 - ローカルクロックをNTPバックアップとして追加し、ログパラメータも設定 - NTPサービスデーモンを再起動 - /etc/security/limits.confで正しいファイルディスクリプタ数を設定 ## Fluentdのインストール Trustyの場合: ``` $ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh ``` 他のディストリビューションについては、Treasure Agent [(td-agent)](https://docs.treasuredata.com/pages/viewpage.action?pageId=331108)のインストールを参照してください。 ## Fluentd.confの編集 ``` $ sudo nano /etc/td-agent/td-agent.conf ``` Kafka用のFluentd Consumerを使用してイベントを転送します(fluentdはデフォルトポート24224からイベントを受信します)。そのため、設定ファイルは次のようになります。 ``` source type forward type tdlog apikey "#{ENV['TD_API_KEY']}" auto_create_table buffer_type file buffer_path /var/log/td-agent/buffer/td flush_interval 5s secondary type file path /var/log/td-agent/failed_records ``` Treasure Data Write-Only APIキーをマシンの環境変数として設定してください。その後、Fluentdを再起動することをお勧めします。 ``` $ /etc/init.d/td-agent restart ``` # Fluentd用Kafka Consumerのインストールと設定 ## JARファイルのダウンロード GitHubのリリースページから、プリコンパイルされたJARファイルをダウンロードします。 [https://github.com/treasure-data/kafka-fluentd-consumer/releases](https://github.com/treasure-data/kafka-fluentd-consumer/releases) JARファイル名は`kafka-fluentd-consumer-x.y.z-all.jar`です。 ## 設定ファイルのダウンロード GitHubから2つの設定ファイルもダウンロードする必要があります。 ``` $ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties $ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties ``` - fluentd-consumer.properties: Kafkaコンシューマーとログ転送パラメータを設定するために使用 - log4j.properties: コンシューマー自体のログパラメータを設定するために使用 以下は、kafka-fluentd-consumerのファイルリストの例です。 ``` $ ls kafka-fluentd-consumer-0.2.4-all.jar fluentd-consumer.properties log4j.properties ``` ## ホストでのJavaのアップグレード 少なくともJavaバージョン1.7を実行していることを確認してください。 ``` $ java -version ``` そうでない場合は、次の手順を実行してください。 ``` $ sudo add-apt-repository ppa:webupd8team/java $ sudo apt-get update $ sudo apt-get install oracle-java7-installer $ sudo apt-get install oracle-java7-set-default ``` # Kafka Fluentd Consumerの実行 ## Zookeeperの起動 ``` $ bin/zookeeper-server-start.sh config/zookeeper.properties ``` ## テストトピックの作成 ``` $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ``` ## JSON形式での複数メッセージの送信 Kafka Fluentd Consumerでは、メッセージをJSON形式で送信する必要があります。 ``` $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test {"a": 1} {"a": 1, "b": 2} ``` ## コンシューマーを起動してメッセージを確認 ``` $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning ``` 次のように表示されるはずです。 ``` {"a": 1} {"a": 1, "b": 2} ``` ## Kafka Fluentd Consumerの設定と実行 適切な設定でfluentd-consumer.propertiesを変更してください。以下のトピック名を、意図したTreasure Dataのテーブル名に変更してください。fluentd.consumer.topics=test。 次のコマンドを実行してコンシューマーを起動します。関連する.jarファイルと.propertiesファイルへのパスを調整する必要がある場合があります。 ``` $ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties ``` プロデューサーから追加のイベントを送信できます。ただし、それらは有効なJSON形式である必要があります。 ## Treasure Dataでデータをクエリ TD Consoleで次のクエリを実行します。 ``` select * from test ``` 「Query Result」で実行が成功したかどうかを確認できます。