Skip to content
Last updated

Kafka インポート連携とFluentd Consumer

Kafka用のFluentd consumerを使用して、KafkaからTreasure Dataにデータを送信できます。consumer自体に加えて、現在のセットアップによっては、いくつかの追加要件がある場合があります。

このドキュメントは、Ubuntu Precise Debianを想定しています。それ以外の場合は、オペレーティングシステムに応じてコマンドを選択する必要があります。

この機能はベータ版です。詳細については、support@treasuredata.comまでお問い合わせください。

前提条件

  • Treasure Dataの基本知識
  • Apache Kafkaのセットアップ、設定、およびプロデューサーとコンシューマーを含むアーキテクチャに関する基本知識
  • Gradleでビルドする能力
  • Linuxの実用的な知識
  • Fluentdの実用的な知識
  • 動作しているKafkaシングルノードまたはマルチブローカーのインストール

Fluentdのインストールと設定

プレインストール

Fluentdのインストールの事前セットアップおよびインストール手順を確認してください。fluentdをセットアップする前に、以下を確認してください。

  • ntpサーバーをインストールし、/etc/ntp.confに適切なrestrict値を設定
  • ローカルクロックをNTPバックアップとして追加し、ログパラメータも設定
  • NTPサービスデーモンを再起動
  • /etc/security/limits.confで正しいファイルディスクリプタ数を設定

Fluentdのインストール

Trustyの場合:

$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh

他のディストリビューションについては、Treasure Agent (td-agent)のインストールを参照してください。

Fluentd.confの編集

$ sudo nano /etc/td-agent/td-agent.conf

Kafka用のFluentd Consumerを使用してイベントを転送します(fluentdはデフォルトポート24224からイベントを受信します)。そのため、設定ファイルは次のようになります。

source
  type forward
</source>

<match td.*.*>
  type tdlog
  apikey "#{ENV['TD_API_KEY']}"

  auto_create_table
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 5s

  secondary
    type file
    path /var/log/td-agent/failed_records
  </secondary>
</match>

Treasure Data Write-Only APIキーをマシンの環境変数として設定してください。その後、Fluentdを再起動することをお勧めします。

$ /etc/init.d/td-agent restart

Fluentd用Kafka Consumerのインストールと設定

JARファイルのダウンロード

GitHubのリリースページから、プリコンパイルされたJARファイルをダウンロードします。

https://github.com/treasure-data/kafka-fluentd-consumer/releases

JARファイル名はkafka-fluentd-consumer-x.y.z-all.jarです。

設定ファイルのダウンロード

GitHubから2つの設定ファイルもダウンロードする必要があります。

$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties
  • fluentd-consumer.properties: Kafkaコンシューマーとログ転送パラメータを設定するために使用
  • log4j.properties: コンシューマー自体のログパラメータを設定するために使用

以下は、kafka-fluentd-consumerのファイルリストの例です。

$ ls
kafka-fluentd-consumer-0.2.4-all.jar    fluentd-consumer.properties    log4j.properties

ホストでのJavaのアップグレード

少なくともJavaバージョン1.7を実行していることを確認してください。

$ java -version

そうでない場合は、次の手順を実行してください。

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default

Kafka Fluentd Consumerの実行

Zookeeperの起動

$ bin/zookeeper-server-start.sh config/zookeeper.properties

テストトピックの作成

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

JSON形式での複数メッセージの送信

Kafka Fluentd Consumerでは、メッセージをJSON形式で送信する必要があります。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}

コンシューマーを起動してメッセージを確認

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

次のように表示されるはずです。

{"a": 1}
{"a": 1, "b": 2}

Kafka Fluentd Consumerの設定と実行

適切な設定でfluentd-consumer.propertiesを変更してください。以下のトピック名を、意図したTreasure Dataのテーブル名に変更してください。fluentd.consumer.topics=test。

次のコマンドを実行してコンシューマーを起動します。関連する.jarファイルと.propertiesファイルへのパスを調整する必要がある場合があります。

$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties

プロデューサーから追加のイベントを送信できます。ただし、それらは有効なJSON形式である必要があります。

Treasure Dataでデータをクエリ

TD Consoleで次のクエリを実行します。

select * from test

「Query Result」で実行が成功したかどうかを確認できます。