Configure Splunk Connect for Kafka
After you bring Kafka Connect up on every host, the Kafka Connect instances automatically form a cluster. A REST call can be executed against one of the cluster instances, and the configuration will be propagated across your entire cluster.
Create a data collection task in Splunk Connect for Kafka
Use the following steps to create a data collection task using Splunk Connect for Kafka.
- Start Kafka Connect.
.$KAFKA_HOME/bin/connect-distributed.sh config/connect-distributed.properties
where$KAFKA_HOME
is the install directory of Kafka on your Kafka Connect host. - Run the following command, using your deployment's details, to create connector tasks.
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{ "name": "kafka-connect-splunk", "config": { "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "3", "topics":"<KAFKA_TOPIC>", "splunk.indexes": "<SPLUNK_DESTINATION_INDEX>", "splunk.hec.uri": "http://<SPLUNK_HEC_URI:SPLUNK_HEC_PORT>", "splunk.hec.token": "<SPLUNK_HEC_TOKEN>", "splunk.hec.raw": "true", "splunk.hec.ack.enabled":"false" } }'
Required configurations Description topics
Configures the Kafka topic for ingestion splunk.indexes
Sets the destination Splunk platform index splunk.hec.uri
Configures the Splunk HTTP Event Collector (HEC) URI splunk.hec.token
Adjust to set the Splunk HEC token. splunk.hec.ack.enabled
Verify that the deployment's indexer acknowledgment configurations used in the REST call match those defined for the target HTTP Event Collector (HEC) token. - (Optional) Enable verbose logging. Enable verbose logging for better visibility into errors and information.
- Navigate to
config/connect-log4j.properties
, and add the following information:log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
- Save your changes.
- Navigate to
- Verify that data is flowing into your Splunk platform instance by searching for the index from your configurations.
For information on advanced parameters, see the Parameters topic.
For more information on using the Splunk HTTP Event Collector (HEC), see the HEC documentation.
Configuration schema structure reference
Use the following schema as a reference for configuring Splunk Connect for Kafka to send data to your Splunk platform deployment.
{ "name": "<connector-name>", "config": { "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", "tasks.max": "<number-of-tasks>", "topics": "<list-of-topics-separated-by-comma>", "splunk.indexes": "<list-of-indexes-for-topics-data-separated-by-comma>", "splunk.sources": "<list-of-sources-for-topics-data-separated-by-comma>", "splunk.sourcetypes": "<list-of-sourcetypes-for-topics-data-separated-by-comma>", "splunk.hec.uri": "<Splunk-HEC-URI>", "splunk.hec.token": "<Splunk-HEC-Token>", "splunk.hec.raw": "<true|false>", "splunk.hec.raw.line.breaker": "<line breaker separator>", "splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>", "value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>", "value.converter.schema.registry.url": "<Schema-Registry-URL>", "value.converter.schemas.enable": "<true|false>", "key.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>", "key.converter.schema.registry.url": "<Schema-Registry-URL>", "key.converter.schemas.enable": "<true|false>", "splunk.hec.ack.enabled": "<true|false>", "splunk.hec.ack.poll.interval": "<event ack poll interval>", "splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>", "splunk.hec.ssl.validate.certs": "<true|false>", "splunk.hec.http.keepalive": "<true|false>", "splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>", "splunk.hec.total.channels": "<total number of channels>", "splunk.hec.max.batch.size": "<max number of kafka records post in one batch>", "splunk.hec.threads": "<number of threads to use to do HEC post for single task>", "splunk.hec.event.timeout": "<timeout in seconds>", "splunk.hec.socket.timeout": "<timeout in seconds>", "splunk.hec.track.data": "<true|false, tracking data loss and latency, for debugging lagging and data loss>" "splunk.header.support": "<true|false>", "splunk.header.custom": "<list-of-custom-headers-to-be-used-from-kafka-headers-separated-by-comma>", "splunk.header.index": "<header-value-to-be-used-as-splunk-index>", "splunk.header.source": "<header-value-to-be-used-as-splunk-source>", "splunk.header.sourcetype": "<header-value-to-be-used-as-splunk-sourcetype>", "splunk.header.host": "<header-value-to-be-used-as-splunk-host>" "kerberos.user.principal": "<The Kerberos user principal the connector may use to authenticate with Kerberos>", "kerberos.keytab.path": "<The path to the keytab file to use for authentication with Kerberos>" "enable.timestamp.extraction": "<true|false>", "timestamp.regex": "<regex for timestamp extraction>", "timestamp.format": "<time-format for timestamp extraction>" } }
Collect from the current Kafka topic offsets
To collect from the latest offset in your Kafka topic, use the below steps:
- Before starting Splunk Connect for Kafka, add the following line of code to the Kafka Connect properties file:
consumer.auto.offset.reset=latest
. - Restart Kafka Connect.
Scale your environment
Before scaling the Splunk Connect for Kafka tier, ensure that the bottleneck is in the connector tier and not in another component. Review the following scaling options:
- Increase the number of parallel tasks by adjusting the
tasks.max
parameter. Only do this if the hardware is underutilized, such as low CPU, low memory usage and low data injection throughput. You can reconfigure the connector with more tasks. - Increase hardware resources on cluster nodes in case of resource exhaustion, such as high CPU, or high memory usage.
- Increase the number of Kafka Connect nodes.
Do not create more tasks than the number of partitions. Creating 2 * CPU tasks per Splunk Kafka Connector is a safe estimate.
For example, assume there are five Kafka Connects running the Splunk Kafka Connector. Each host is 8 CPUs with 16 GB memory. And there are 200 partitions to collect data from. tasks.max
will be: tasks.max= 2 * CPUs/host * Kafka Connect instances = 2 * 8 * 5 = 80 tasks. Alternatively, if there are only 60 partitions to consume from, then set tasks.max
to 60. Otherwise, the remaining 20 will be pending.
Determine number of Kafka Connect instances
Determine the number of Kafka Connect instances needed by estimating how much volume per day Splunk Connect for Kafka needs to index in your Splunk platform. For example, an 8 CPU, 16 GB memory machine can achieve 50 - 60 MB/s throughput from Kafka into your Splunk platform if your Splunk platform deployment is sized correctly.
Data loss and latency monitoring
When configuring Splunk Connect for Kafka using the REST API, "splunk.hec.track.data": "true"
can be configured to allow data loss tracking and data collection latency monitoring. This is accomplished by enriching the raw data with offset, timestamp, partition, and topic metadata. This setting will only work in conjunction with HEC /event endpoint ("splunk.hec.raw" : "false"
)
Data loss tracking
Splunk Connect for Kafka uses offset to track data loss since offsets in a Kafka topic partition are sequential. If there is a gap in the Splunk software, there is data loss.
Data latency tracking
Splunk Connect for Kafka uses the timestamp of the record to track the time elapsed between the time a Kafka record was created and the time the record was indexed in Splunk.
Data duplication and data loss
Run the following SPL query to identify data duplication:
index=main sourcetype="<sourcetype>" | dedup kafka_offset kafka_partition |stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)
Run the following SPL query to identify data loss:
index=main sourcetype="<sourcetype>" | stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)
Malformed data
If the raw data of the Kafka records is a JSON object but is not able to be marshaled, or if the raw data is in bytes but it is not UTF-8 encodable, Splunk Connect for Kafka considers these records malformed. It will log the exception with Kafka specific information (topic, partition, offset) for these records within the console, as well as the malformed records information will be indexed in Splunk. Users can search
type=malformed
within the Splunk software to return any malformed Kafka records encountered.
Upgrade Splunk Kafka Connect | Security configurations for Splunk Connect for Kafka |
This documentation applies to the following versions of Splunk® Connect for Kafka: 2.2.2
Feedback submitted, thanks!