Kafka用のFluentd consumerを使用して、KafkaからTreasure Dataにデータを送信できます。consumer自体に加えて、現在のセットアップによっては、いくつかの追加要件がある場合があります。

このドキュメントは、Ubuntu Precise Debianを想定しています。それ以外の場合は、オペレーティングシステムに応じてコマンドを選択する必要があります。
この機能はベータ版です。詳細については、support@treasuredata.comまでお問い合わせください。
- Treasure Dataの基本知識
- Apache Kafkaのセットアップ、設定、およびプロデューサーとコンシューマーを含むアーキテクチャに関する基本知識
- Gradleでビルドする能力
- Linuxの実用的な知識
- Fluentdの実用的な知識
- 動作しているKafkaシングルノードまたはマルチブローカーのインストール
Fluentdのインストールの事前セットアップおよびインストール手順を確認してください。fluentdをセットアップする前に、以下を確認してください。
- ntpサーバーをインストールし、/etc/ntp.confに適切なrestrict値を設定
- ローカルクロックをNTPバックアップとして追加し、ログパラメータも設定
- NTPサービスデーモンを再起動
- /etc/security/limits.confで正しいファイルディスクリプタ数を設定
Trustyの場合:
$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh他のディストリビューションについては、Treasure Agent (td-agent)のインストールを参照してください。
$ sudo nano /etc/td-agent/td-agent.confKafka用のFluentd Consumerを使用してイベントを転送します(fluentdはデフォルトポート24224からイベントを受信します)。そのため、設定ファイルは次のようになります。
source
type forward
</source>
<match td.*.*>
type tdlog
apikey "#{ENV['TD_API_KEY']}"
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
flush_interval 5s
secondary
type file
path /var/log/td-agent/failed_records
</secondary>
</match>Treasure Data Write-Only APIキーをマシンの環境変数として設定してください。その後、Fluentdを再起動することをお勧めします。
$ /etc/init.d/td-agent restartGitHubのリリースページから、プリコンパイルされたJARファイルをダウンロードします。
https://github.com/treasure-data/kafka-fluentd-consumer/releases
JARファイル名はkafka-fluentd-consumer-x.y.z-all.jarです。
GitHubから2つの設定ファイルもダウンロードする必要があります。
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties- fluentd-consumer.properties: Kafkaコンシューマーとログ転送パラメータを設定するために使用
- log4j.properties: コンシューマー自体のログパラメータを設定するために使用
以下は、kafka-fluentd-consumerのファイルリストの例です。
$ ls
kafka-fluentd-consumer-0.2.4-all.jar fluentd-consumer.properties log4j.properties少なくともJavaバージョン1.7を実行していることを確認してください。
$ java -versionそうでない場合は、次の手順を実行してください。
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default$ bin/zookeeper-server-start.sh config/zookeeper.properties$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKafka Fluentd Consumerでは、メッセージをJSON形式で送信する必要があります。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning次のように表示されるはずです。
{"a": 1}
{"a": 1, "b": 2}適切な設定でfluentd-consumer.propertiesを変更してください。以下のトピック名を、意図したTreasure Dataのテーブル名に変更してください。fluentd.consumer.topics=test。
次のコマンドを実行してコンシューマーを起動します。関連する.jarファイルと.propertiesファイルへのパスを調整する必要がある場合があります。
$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.propertiesプロデューサーから追加のイベントを送信できます。ただし、それらは有効なJSON形式である必要があります。
TD Consoleで次のクエリを実行します。
select * from test「Query Result」で実行が成功したかどうかを確認できます。