Configure Splunk Connect for Kafka
After you bring Kafka Connect up on every host, the Kafka Connect instances automatically form a cluster. A REST call can be executed against one of the cluster instances, and the rest of the instances will take on the task automatically.
Create a data collection task in Splunk Connect for Kafka
Use the following steps to create a data collection task using Splunk Connect for Kafka.
- Start Kafka Connect.
.$KAFKA_CONNECT_HOME/bin/connect-distributed.sh config/connect-distributed.properties
- Run the following command to create connector tasks, and adjust the following parameters, based on your deployment.
curl <KAFKA_CONNECT_HOST>:8083/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "<SPLUNK_KAFKA_CONNECTOR_NAME>", "config": { "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "<NUM_OF_TASKS>", "topics":"<KAFKA_TOPICS>", "splunk.hec.uri": "<SPLUNK_HEC_URIS>", "splunk.hec.token": "<HEC_TOKEN>", "splunk.hec.ack.enabled": "<TRUE|FALSE>", "splunk.hec.ssl.validate.certs": "<TRUE|FALSE>" } }'
Ensure that the deployment's indexer acknowledgment configurations used in the REST call (
splunk.hec.ack.enabled
) match those defined for the target HTTP Event Collector (HEC) token. - Verify that data is flowing into your Splunk platform instance by searching your indexers for Kafka events.
Collect from the current Kafka topic offsets
To collect from Kafka topic offsets, follow the below steps:
- Before starting Splunk Connect for Kafka, add the following line of code to the Kafka Connect properties file:
consumer.auto.offset.reset=latest
- Restart Kafka Connect
For information on each parameter, see the Parameters topic.
Splunk Connect for Kafka commands
Use the following commands to check the status of Splunk Connect for Kafka, to manage connectors, and to manage tasks:
Description | Command |
---|---|
List active connectors | curl http://<KAFKA_CONNECT_HOST>:8083/connectors/ |
Get kafka-connect-splunk connector information | curl http://<KAFKA_CONNECT_HOST>:8083/connectors/<SPLUNK_KAFKA_CONNECTOR_NAME> |
Get kafka-connect-splunk connector configuration information | curl http://<KAFKA_CONNECT_HOST>:8083/connectors/<SPLUNK_KAFKA_CONNECTOR_NAME>/config |
Delete kafka-connect-splunk connector | curl http://<KAFKA_CONNECT_HOST>:8083/connectors/<SPLUNK_KAFKA_CONNECTOR_NAME> -X DELETE |
Get kafka-connect-splunk connector task information | curl http://<KAFKA_CONNECT_HOST>:8083/connectors/<SPLUNK_KAFKA_CONNECTOR_NAME>/tasks |
See the Apache Kafka documentation for additional REST examples.
Configuration schema structure reference
Use the following schema to configure Splunk Connect for Kafka to send data to your Splunk platform deployment.
{ "name": "<connector-name>", "config": { "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "<number-of-tasks>", "topics": "<list-of-topics-separated-by-comma>", "splunk.indexes": "<list-of-indexes-for-topics-data-separated-by-comma>", "splunk.sources": "<list-of-sources-for-topics-data-separated-by-comma>", "splunk.sourcetypes": "<list-of-sourcetypes-for-topics-data-separated-by-comma>", "splunk.hec.uri": "<Splunk-HEC-URI>", "splunk.hec.token": "<Splunk-HEC-Token>", "splunk.hec.raw": "<true|false>", "splunk.hec.raw.line.breaker": "<line breaker separator>", "splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>", "splunk.hec.ack.enabled": "<true|false>", "splunk.hec.ack.poll.interval": "<event ack poll interval>", "splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>", "splunk.hec.ssl.validate.certs": "<true|false>", "splunk.hec.http.keepalive": "<true|false>", "splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>", "splunk.hec.total.channels": "<total number of channels>", "splunk.hec.max.batch.size": "<max number of kafka records post in one batch>", "splunk.hec.threads": "<number of threads to use to do HEC post for single task>", "splunk.hec.event.timeout": "<timeout in seconds>", "splunk.hec.socket.timeout": "<timeout in seconds>", "splunk.hec.track.data": "<true|false, tracking data loss and latency, for debugging lagging and data loss>" } }
Scale your environment
Before scaling the Splunk Connect for Kafka tier, ensure that the bottleneck is in the connector tier and not in another component. Review the following scaling options:
- Increase the number of parallel tasks by adjusting the
tasks.max
parameter. Only do this if the hardware is underutilized, such as low CPU, low memory usage and low data injection throughput. You can reconfigure the connector with more tasks. - Increase hardware resources on cluster nodes in case of resource exhaustion, such as high CPU, or high memory usage.
- Increase the number of Kafka Connect nodes.
Do not create more tasks than the number of partitions. Creating 2 * CPU tasks per Splunk Kafka Connector is a safe estimate.
For example, assume there are five Kafka Connects running the Splunk Kafka Connector. Each host is 8 CPUs with 16 GB memory. And there are 200 partitions to collect data from. max.tasks
will be: max.tasks = 2 * CPUs/host * Kafka Connect instances = 2 * 8 * 5 = 80 tasks. Alternatively, if there are only 60 partitions to consume from, then set max.tasks
to 60. Otherwise, the remaining 20 will be pending.
Determine number of Kafka Connect instances
Determine the number of Kafka Connect instances needed by estimating how much volume per day Splunk Connect for Kafka needs to index in your Splunk platform. For example, an 8 CPU, 16 GB memory machine can achieve 50 - 60 MB/s throughput from Kafka into your Splunk platform if your Splunk platform deployment is sized correctly.
Data loss and latency monitoring
When configuring Splunk Connect for Kafka using the REST API, "splunk.hec.track.data": "true"
can be configured to allow data loss tracking and data collection latency monitoring. This is accomplished by enriching the raw data with offset, timestamp, partition, and topic metadata. This setting will only work in conjunction with HEC /event endpoint ("splunk.hec.raw" : "false"
)
Data loss tracking
Splunk Connect for Kafka uses offset to track data loss since offsets in a Kafka topic partition are sequential. If there is a gap in the Splunk software, there is data loss.
Data latency tracking
Splunk Connect for Kafka uses the timestamp of the record to track the time elapsed between the time a Kafka record was created and the time the record was indexed in Splunk.
Data duplication and data loss
Run the following SPL query to identify data duplication:
index=main sourcetype="<sourcetype>" | stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)
Run the following SPL query to identify data loss:
index=main sourcetype="<sourcetype>" | dedup kafka_offset kafka_partition |stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)
Install Splunk Connect for Kafka | Configuration examples |
This documentation applies to the following versions of Splunk® Connect for Kafka: 1.0.0
Feedback submitted, thanks!