# Apache Kafka And Confluent Cloud Integration ## Why is Kafka with Treasure Data CDP Important? Kafka acts as a real-time data streaming platform, allowing the IT department to efficiently ingest, process, and manage vast amounts of data from various sources. TD CDP also offers a real-time capability. However, the combination of Kafka and TD CDP allows marketers to design and execute customer journeys based on real-time customer behavior with an existing central data streaming platform. In addition, the IT team can ensure seamless data integration and synchronization across different systems, applications, and databases. This simplifies the data management process and enables IT to maintain a single source of truth for customer data. # How Kafka and TD are Integrated ![](/assets/image2023-7-29_8-15-42.acd9b8da093d311ef0e8933ec8282a07738217c246e7b8c3bf4ec12c1d103a2d.baa96da0.png) Confluent provides Confluent Cloud (managed service) and Confluent Platform (on-premium service) to run Apache Kafka. These services officially support HTTP Sink Connector to integrate Kafka with an API via HTTPS. For example, this connector is available in a Sink configuration to pull data from Kafka to a downstream REST API. Even if you don't use Confluent services, Apache Kafka can also support this approach with Aiven's OSS HTTP Sink Connector. With these connectors, you can push a record to Treasure Data via Ingestion API in near real-time. # How HTTP Sink Connector and Ingestion API are Integrated HTTP Sink connector consumes records from Kafka topic(s) and converts each record value from Avro, JSON, etc., to a JSON with *request.body.format=json* before sending it in the request body to the configured *http.api.url*, which optionally can reference the record key and/or topic name. The Ingestion API supports a POST request to acquire converted JSON records. The connector batches record up to the set *batch.max.size* before sending the batched request to the API. Each record is converted to its JSON representation with request.body.format=json and then separated with the batch.separator. Here are some common parameters for HTTP Sink Connector in the configuration file to send a POST request to Ingestion API. For more information, review [Importing Table Records Using the Data Ingestion API](/products/customer-data-platform/integration-hub/streaming/importing-table-records-using-the-data-ingestion-api). Records are transformed for Ingestion API by using the following parameters for a *single* JSON request. - **http.api.url**: https://us01.records.in.treasuredata.com/DB/table - **request.method**: POST - **http.authorization.type**: none - **http.headers.content.type**: application/vnd.treasuredata.v1.single+json - **http.headers.additional**: "Authorization: TD1 TD WRITE API KEY" - **batch.max.size**: 1 - **request.body.format**: json - **[batch.json.as](http://batch.json.as).array**: false In the case of a high message throughput environment, you might want to ingest multiple records in one JSON record per request. In this scenario, you will change the parameter to support ingesting multiple records with the Ingestion API. - **http.api.url**: https://us01.records.in.treasuredata.com/DB/table - **request.method**: POST - **http.authorization.type**: none - **http.headers.content.type**: application/vnd.treasuredata.v1+json - **http.headers.additional**: "Authorization: TD1 TD WRITE API KEY" - **batch.max.size**: 2 (Up to 500) - **request.body.format**: json - **[batch.json.as](http://batch.json.as).array**: true - **Batch prefix**: {"records": - **Batch suffix**: } - **Batch separator**: , Review the following tutorials to review step-by-step guidance to integrate Kafka with Treasure Data CDP. # Confluent Cloud with Treasure Data CDP Tutorial Confluent Cloud is a managed Kafka service based on Apache Kafka. The HTTP Sink Connector lets you retrieve data from Kafka topics and send it to Treasure Data as an external HTTP endpoint. This tutorial describes the steps to set up the HTTP Sink Connector with Confluent Cloud. The following steps might be different from the most recent Confluence Cloud user interface. ## Prerequisites - Confluent Cloud Paid Account and Kafka Cluster - Basic understanding of Confluent Cloud - Topics in your Kafka cluster have messages If you don't have any production data, test out dummy data with [this page](https://developer.confluent.io/tutorials/kafka-connect-datagen/confluent.md). ## Configure the HTTP Sink Connector 1. Navigate to the **Connectors** section. 2. Select **New Connector** to create a new connector. 3. Select **HTTP Sink** from the available connectors. You might find two HTTP Sink connectors: HTTP Sink and HTTP Sink Connector. When you use Confluent Cloud, choose **HTTP Sink**. 1. Select **Credentials and Access** permissions based on your requirements. ## Authentication ### On the connector's Authentication page, provide the required settings: - **HTTP URL**: ex. `https://us01.records.in.treasuredata.com/support/kafka\_sample` - The format is `https://us01.records.in.treasuredata.com/DatabaseName/TableName` - For more information, review [Importing Table Records Using the Data Ingestion API](/products/customer-data-platform/integration-hub/streaming/importing-table-records-using-the-data-ingestion-api). - **SSL Protocol**: TLSv1.2 - **Enable Host verification**: false - **Endpoint Authentication Type**: NONE If you use Master APIKEY, you can use Confluent cluster static IP addresses for your user IP Whitelist. ![](/assets/image2023-8-7_14-59-8.f4d331fc77c4daaab640937f11ed69f92d1b13f26473dcb8285f43257054a387.baa96da0.png) ## Configuration The following information is for Treasure Data Ingestion API for Single Record. Additionally, Treasure Data includes the parameters for multiple records. Determine which use case is appropriate before proceeding. ### Parameters for Single Record - **Input Kafka record value format**: - Choose the appropriate format based on your current topic setting - **HTTP Request Method**: POST - **HTTP Headers: Authorization:** ***TD1 XXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1.single+json*** - Put the Treasure Data API Key after TD1spaceTreasure Data API Key |Content-Type:application/vnd.treasuredata.v1.single+json - **HTTP Headers Separator**: *|* - **Behavior for null valued records**: ignore - **Behavior on errors**: fail - Based on your requirements - **Report errors as**: error_string - **Request Body Format**: json - **Batch json as array**: false - **Batch max size**: 1 ### Parameters for Multiple Records - **Input Kafka record value format**: - Choose the appropriate format based on your current topic setting - **HTTP Request Method**: POST - **HTTP Headers: Authorization:** ***TD1 XXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1+json*** - Put the Treasure Data API Key after TD1spaceTreasure Data API Key |Content-Type:application/vnd.treasuredata.v1+json - **HTTP Headers Separator**: *|* - **Behavior for null valued records**: ignore - **Behavior on errors**: fail - Based on your requirements - **Report errors as**: error_string - **Request Body Format**: json - **Batch json as array**: true - **Batch max size**: 500 - **Batch prefix**: {"records": - **Batch suffix**: } ![](/assets/image2023-8-7_15-15-15.d04cb24b67385114d0a42635e85e385910b1aab0c01a356ee8fe7b5cbafdcba3.baa96da0.png) Select **Continue**. ## Verify Data After the configuration is complete, the connector starts automatically. On the topic page, review how many records start with success_name. ![](/assets/image2023-8-7_15-22-17.3bb1f180cb0f702370135ccbd5677c06b2cedf2cf8c0b0d9ea2d0080aa024b0a.baa96da0.png) ## Check the Data Availability on Treasure Data ![](/assets/image2023-8-7_15-25-1.0977c0dec67aedb46750cf5807323b75dd2670dd23726466b6cb40a4cf49d2e1.baa96da0.png) # Confluent Platform with Treasure Data CDP Tutorial Apache Kafka is an open-source distributed event streaming platform, while Confluent Platform is a commercial distribution of Kafka that extends it with additional features and tools to simplify deployment and management for enterprises. Use the proper platform for your company environment. Both platforms support HTTP Sink Connector, which allows you to retrieve data from Kafka topics and send it to Treasure Data as an external HTTP endpoint. This tutorial describes the steps to set up the HTTP Sink Connector with Confluent Platform and Apache Kafka. ## Prerequisites - Confluent Platform paid license or Apache Kafka cluster - Confluent Platform requires an HTTP Sink Connector supported by Confluent ([link](https://confluent.cloud/environments/env-oqdr7y/clusters/lkc-gkjmv1/connectors/configure/kafka-connect-http)) - Apache Kafka requires Aiven's HTTP Sink Connector maintained by the Kafka community ([link](https://github.com/Aiven-Open/http-connector-for-apache-kafka)) Treasure Data doesn't provide commercial support for these connectors. - Basic understanding of Apache Kafka ## Install Kafka Connect Framework Ensure you have Kafka Connect installed. This framework connects external systems to Kafka. If you are using the Confluent Platform, Kafka Connect comes pre-installed. Otherwise, you need to download and install it separately. - Apache Kafka Connect [https://kafka.apache.org/documentation/#connect](https://kafka.apache.org/documentation/#connect) - Confluent Kafka Connect: [https://docs.confluent.io/platform/current/connect/index.html](https://docs.confluent.io/platform/current/connect/index.html) ## Install the HTTP Sink Connector Plugin To send data to an HTTP endpoint, you'll need an HTTP Sink Connector plugin. Check the Confluent Hub or other sources for the specific plugin version that is compatible with your Kafka Connect version. Then, place the downloaded HTTP Sink Connector plugin (JAR file) into the appropriate Kafka Connect plugin directory. - Download via Confluent Hub for Confluent Platform: [https://docs.confluent.io/kafka-connectors/http/current/overview.html](https://docs.confluent.io/kafka-connectors/http/current/overview.html) - Download via Github for Apache Kafka: [https://github.com/Aiven-Open/http-connector-for-apache-kafka](https://github.com/Aiven-Open/http-connector-for-apache-kafka) ## Configure the HTTP Sink Connector Create a configuration file (JSON or properties) to define the HTTP Sink Connector's settings. This configuration file typically includes the following properties: - **name**: A unique name for the connector. - **connector.class**: The class name for the HTTP Sink Connector (e.g., io.confluent.connect.http.HttpSinkConnector). - **tasks.max**: The number of tasks to be created for the connector (usually equal to the number of Kafka partitions). - **topics**: The Kafka topics from which data will be consumed and sent to the HTTP endpoint. - **http.api.url**: The URL of the HTTP endpoint to which the data will be sent. - **http.api.method**: The HTTP method to use when making the request (e.g., POST, PUT, etc.). - **http.headers**: Additional HTTP headers to include in the request. - **http.timeout.ms**: The timeout for the HTTP request. Other optional properties as per your use case and requirements. These configuration properties might vary depending on the specific version of Kafka Connect and the HTTP Sink Connector plugin you use. Always refer to the official documentation and the plugin's documentation for the most up-to-date information and instructions. - Confluent Platform: [https://docs.confluent.io/kafka-connectors/http/current/connector_config.html](https://docs.confluent.io/kafka-connectors/http/current/connector_config.html) - Apache Kafka [https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst](https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst) ### Confluent Platform Create a configuration JSON file to define the HTTP Sink Connector's settings. Follow the guidelines for the connector at [https://docs.confluent.io/kafka-connectors/http/current/connector_config.html#http-sink-connector-configuration-properties](https://docs.confluent.io/kafka-connectors/http/current/connector_config.html#http-sink-connector-configuration-properties) #### Single-row Ingestion JSON Example ```json { "name": "tdsinglejsonHttpSink", "config": { "topics": "rest-messages5", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "https://us01.records.in.treasuredata.com/database_name/table_name", "request.method": "POST", "headers": "Authorization:TD1 XXXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1.single+json", "header.separator": "|", "report.errors.as": "error_string", "request.body.format": "json", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "batch.prefix":"", "batch.suffix":"", "batch.max.size": "1", "batch.json.as.array": "false", "tasks.max": "1", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.name": "success-responses", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.name": "error-responses", "reporter.error.topic.replication.factor": "1", "ssl.enabled":"true", "https.ssl.protocol":"TLSv1.2" } } ``` #### Multi-row Ingestion JSON Example ```json { "name": "tdBatchHttpSink", "config": { "topics": "rest-messages5", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "https://us01.records.in.treasuredata.com/DB/TBL", "request.method": "POST", "headers": "Authorization:TD1 XXXX/xxxx|Content-Type:application/vnd.treasuredata.v1+json", "header.separator": "|", "report.errors.as": "error_string", "request.body.format": "string", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "batch.prefix":"{\"records\":", "batch.suffix":"}", "batch.max.size": "500", "batch.json.as.array": "true", "tasks.max": "1", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.name": "success-responses", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.name": "error-responses", "reporter.error.topic.replication.factor": "1", "ssl.enabled":"true", "https.ssl.protocol":"TLSv1.2" } } ``` # Apache Kafka with Treasure Data CDP Tutorial Create a configuration JSON file to define the HTTP Sink Connector's settings. Follow the guidelines for the connector in [https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst](https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst). They support similar parameters that Confluent HTTP Sink Connector supported. ```json { "name": "tdCommunityHttpSink", "config": { "topics.regex": "td-rest-messages1", "connector.class": "io.aiven.kafka.connect.http.HttpSinkConnector", "http.url": "https://us01.records.in.treasuredata.com/database_name/table_name", "http.authorization.type": "none", "http.headers.content.type": "application/vnd.treasuredata.v1.single+json", "http.headers.additional": "Authorization: TD1 XXXX/xxxx", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "batching.enabled": "false", "batch.max.size": "1", "batch.prefix":"{\"records\":[", "batch.suffix":"]}", "batch.separator": "n", "tasks.max": "1" } } ``` ## Start the HTTP Sink Connector Submit the configuration to Kafka Connect to start the HTTP Sink Connector. You can use the confluent command-line tool or make a REST API call to the Kafka Connect REST interface. For detailed information, refer to [KAFKA CONNECT 101](https://developer.confluent.io/courses/kafka-connect/rest-api/).