Splunk® Connect for Kafka

Install and Administer Splunk Connect for Kafka

Configure Splunk Connect for Kafka

After you bring Kafka Connect up on every host, the Kafka Connect instances automatically form a cluster. A REST call can be executed against one of the cluster instances, and the configuration will be propagated across your entire cluster.

Create a data collection task in Splunk Connect for Kafka

Use the following steps to create a data collection task using Splunk Connect for Kafka.

  1. Start Kafka Connect.
    .$KAFKA_HOME/bin/connect-distributed.sh config/connect-distributed.properties where $KAFKA_HOME is the install directory of Kafka on your Kafka Connect host.
  2. Run the following command, using your deployment's details, to create connector tasks.
      curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
        "name": "kafka-connect-splunk",
        "config": {
          "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
          "tasks.max": "3",
          "topics":"<KAFKA_TOPIC>",
          "splunk.indexes": "<SPLUNK_DESTINATION_INDEX>",
          "splunk.hec.uri": "http://<SPLUNK_HEC_URI:SPLUNK_HEC_PORT>",
          "splunk.hec.token": "<SPLUNK_HEC_TOKEN>",
          "splunk.hec.raw": "true",
          "splunk.hec.ack.enabled":"false"
        }
      }'    
    
    Required configurations Description
    topics Configures the Kafka topic for ingestion
    splunk.indexes Sets the destination Splunk platform index
    splunk.hec.uri Configures the Splunk HTTP Event Collector (HEC) URI
    splunk.hec.token Adjust to set the Splunk HEC token.
    splunk.hec.ack.enabled Verify that the deployment's indexer acknowledgment configurations used in the REST call match those defined for the target HTTP Event Collector (HEC) token.
  3. (Optional) Enable verbose logging. Enable verbose logging for better visibility into errors and information.
    1. Navigate to config/connect-log4j.properties, and add the following information:
      log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
    2. Save your changes.
  4. Verify that data is flowing into your Splunk platform instance by searching for the index from your configurations.

For information on advanced parameters, see the Parameters topic.

For more information on using the Splunk HTTP Event Collector (HEC), see the HEC documentation.

Configuration schema structure reference

Use the following schema as a reference for configuring Splunk Connect for Kafka to send data to your Splunk platform deployment.

{
"name": "<connector-name>",
"config": {
   "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
   "tasks.max": "<number-of-tasks>",
   "topics": "<list-of-topics-separated-by-comma>",
   "splunk.indexes": "<list-of-indexes-for-topics-data-separated-by-comma>",
   "splunk.sources": "<list-of-sources-for-topics-data-separated-by-comma>",
   "splunk.sourcetypes": "<list-of-sourcetypes-for-topics-data-separated-by-comma>",
   "splunk.hec.uri": "<Splunk-HEC-URI>",
   "splunk.hec.token": "<Splunk-HEC-Token>",
   "splunk.hec.raw": "<true|false>",
   "splunk.hec.raw.line.breaker": "<line breaker separator>",
   "splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>",
   "value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
   "value.converter.schema.registry.url": "<Schema-Registry-URL>",
   "value.converter.schemas.enable": "<true|false>",
   "key.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
   "key.converter.schema.registry.url": "<Schema-Registry-URL>",
   "key.converter.schemas.enable": "<true|false>",
   "splunk.hec.ack.enabled": "<true|false>",
   "splunk.hec.ack.poll.interval": "<event ack poll interval>",
   "splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
   "splunk.hec.ssl.validate.certs": "<true|false>",
   "splunk.hec.http.keepalive": "<true|false>",
   "splunk.hec.max.http.connection.per.channel": "<max number of http connections per channel>",
   "splunk.hec.total.channels": "<total number of channels>",
   "splunk.hec.max.batch.size": "<max number of kafka records post in one batch>",
   "splunk.hec.threads": "<number of threads to use to do HEC post for single task>",
   "splunk.hec.event.timeout": "<timeout in seconds>",
   "splunk.hec.socket.timeout": "<timeout in seconds>",
   "splunk.hec.track.data": "<true|false, tracking data loss and latency, for debugging lagging and data loss>"
   "splunk.header.support": "<true|false>",
   "splunk.header.custom": "<list-of-custom-headers-to-be-used-from-kafka-headers-separated-by-comma>", 
   "splunk.header.index": "<header-value-to-be-used-as-splunk-index>",
   "splunk.header.source": "<header-value-to-be-used-as-splunk-source>",
   "splunk.header.sourcetype": "<header-value-to-be-used-as-splunk-sourcetype>",
   "splunk.header.host": "<header-value-to-be-used-as-splunk-host>"
   "kerberos.user.principal": "<The Kerberos user principal the connector may use to authenticate with Kerberos>",
   "kerberos.keytab.path": "<The path to the keytab file to use for authentication with Kerberos>"
   "enable.timestamp.extraction": "<true|false>",
   "timestamp.regex": "<regex for timestamp extraction>",
   "timestamp.format": "<time-format for timestamp extraction>"
  }
}

Collect from the current Kafka topic offsets

To collect from the latest offset in your Kafka topic, use the below steps:

  1. Before starting Splunk Connect for Kafka, add the following line of code to the Kafka Connect properties file: consumer.auto.offset.reset=latest.
  2. Restart Kafka Connect.

Scale your environment

Before scaling the Splunk Connect for Kafka tier, ensure that the bottleneck is in the connector tier and not in another component. Review the following scaling options:

  • Increase the number of parallel tasks by adjusting the tasks.max parameter. Only do this if the hardware is underutilized, such as low CPU, low memory usage and low data injection throughput. You can reconfigure the connector with more tasks.
  • Increase hardware resources on cluster nodes in case of resource exhaustion, such as high CPU, or high memory usage.
  • Increase the number of Kafka Connect nodes.

Do not create more tasks than the number of partitions. Creating 2 * CPU tasks per Splunk Kafka Connector is a safe estimate. For example, assume there are five Kafka Connects running the Splunk Kafka Connector. Each host is 8 CPUs with 16 GB memory. And there are 200 partitions to collect data from. tasks.max will be: tasks.max= 2 * CPUs/host * Kafka Connect instances = 2 * 8 * 5 = 80 tasks. Alternatively, if there are only 60 partitions to consume from, then set tasks.max to 60. Otherwise, the remaining 20 will be pending.

Determine number of Kafka Connect instances

Determine the number of Kafka Connect instances needed by estimating how much volume per day Splunk Connect for Kafka needs to index in your Splunk platform. For example, an 8 CPU, 16 GB memory machine can achieve 50 - 60 MB/s throughput from Kafka into your Splunk platform if your Splunk platform deployment is sized correctly.

Data loss and latency monitoring

When configuring Splunk Connect for Kafka using the REST API, "splunk.hec.track.data": "true" can be configured to allow data loss tracking and data collection latency monitoring. This is accomplished by enriching the raw data with offset, timestamp, partition, and topic metadata. This setting will only work in conjunction with HEC /event endpoint ("splunk.hec.raw" : "false")

Data loss tracking

Splunk Connect for Kafka uses offset to track data loss since offsets in a Kafka topic partition are sequential. If there is a gap in the Splunk software, there is data loss.

Data latency tracking

Splunk Connect for Kafka uses the timestamp of the record to track the time elapsed between the time a Kafka record was created and the time the record was indexed in Splunk.

Data duplication and data loss

Run the following SPL query to identify data duplication:

index=main sourcetype="<sourcetype>" | dedup kafka_offset kafka_partition |stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)

Run the following SPL query to identify data loss:

index=main sourcetype="<sourcetype>" | stats count as TotalCount, max(kafka_offset) as Offset by kafka_partition | eval loss= TotalCount - (Offset+1)

Malformed data

If the raw data of the Kafka records is a JSON object but is not able to be marshaled, or if the raw data is in bytes but it is not UTF-8 encodable, Splunk Connect for Kafka considers these records malformed. It will log the exception with Kafka specific information (topic, partition, offset) for these records within the console, as well as the malformed records information will be indexed in Splunk. Users can search

type=malformed

within the Splunk software to return any malformed Kafka records encountered.

Last modified on 04 October, 2024
Upgrade Splunk Kafka Connect   Security configurations for Splunk Connect for Kafka

This documentation applies to the following versions of Splunk® Connect for Kafka: 2.2.2


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters