Splunk® Data Stream Processor

Function Reference

Acrobat logo Download manual as PDF


DSP 1.2.0 is impacted by the CVE-2021-44228 and CVE-2021-45046 security vulnerabilities from Apache Log4j. To fix these vulnerabilities, you must upgrade to DSP 1.2.4. See Upgrade the Splunk Data Stream Processor to 1.2.4 for upgrade instructions.

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.
Acrobat logo Download topic as PDF

Send data to Kafka

Use the Send to Kafka sink function to send data to an Apache or Confluent Kafka topic.

Prerequisites

Before you can use this function, you must do the following:

  • Create a Kafka connection. You'll use the ID of this connection for the connection_id argument when configuring the sink function. See the following pages in the Connect to Data Sources and Destinations with DSP manual for more information:
  • Create the destination topic in your Apache Kafka or Confluent Kafka broker.
    • For information about creating a topic in Apache Kafka, search for "Apache Kafka Quickstart" in the Apache Kafka documentation.
    • For information about creating a topic in Confluent Kafka, search for "Quick Start for Apache Kafka" in the Confluent documentation.

    If you activate your pipeline before creating the topic specified in the topic argument, the pipeline fails to send data to Kafka and restarts indefinitely.

Function input schema

collection<record<R>>
This function takes in collections of records with schema R.

Required arguments

connection_id
Syntax: string
Description: The ID of your Kafka connection.
Example in Canvas View: "879837b0-cabf-4bc2-8589-fcc4dad753e7"
topic
Syntax: string
Description: Specify your Kafka topic here.
Example in Canvas View: my-topic

Make sure that the destination topic exists in your Kafka broker. If you activate your pipeline before the specified topic is created, the pipeline fails to send data to Kafka and restarts indefinitely.

key
Syntax: expression<bytes>
Description: Your Kafka key, in bytes. Kafka keys are used to determine which partition to send data to. Kafka only preserves the order of messages within a partition; Kafka does not preserve the order of messages across multiple partitions. Therefore, if it is important for your use case to preserve ordering, then you should either send your data to one specific partition or write an expression that determines which partition to send data to based on the contents of your data. If you set this to to_bytes(null) or empty string to_bytes(""), then all data will be routed to partition 0 in your Kafka topic. See the SPL2 examples.
Example in Canvas View: to_bytes(customerId)
value
Syntax: expression<bytes>
Description: The data payload, in bytes, for each event.
Example in Canvas View: to_bytes("")

Optional arguments

producer_properties
Syntax: map<string, string>
Description: Add optional producer properties here. For a list of valid producer properties, see the "Producer Configs" section in the Apache Kafka documentation.
Example in Canvas View: {"reconnect.backoff.max.ms": 1500}

SPL2 examples

1. Route data to a specific Kafka partition based on customerId

...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(customerId), to_bytes(""));

For example, if you have data that looks similar to the following:

customerId change_in_bank_account
user123 +200
user456 +100
user123 +200
user123 -1337

Based on the key provided, the Kafka partitioning looks something like the following:

// partition 0
user123:{"customerId": user123, "change_in_bank_account": +200}
user123:{"customerId": user123, "change_in_bank_account": +200}
user123:{"customerId": user123, "change_in_bank_account": -1337}
// partition 1
user456:{"customerId": user456, "change_in_bank_account": +100}

2. Route all data to partition 0 in Kafka example

When working in the SPL View, you can write the function by providing the arguments in this exact order.

...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(""), to_bytes(""));

Alternatively, you can use named arguments to declare the arguments in any order and leave out optional arguments you don't want to declare. All unprovided arguments use their default values. The following example provides the arguments in an arbitrary order.

...| into kafka(topic: topic1, connection_id: "879837b0-cabf-4bc2-8589-fcc4dad753e7", key: to_bytes(""), value: to_bytes(""));

If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.

3. Route data to a random Kafka partition

If ordering of events is not important for your use case and you have a lot of Kafka partitions, you can randomize the assigned partition to optimize for the best throughput possible instead.

...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(randomint()), to_bytes(""));
Last modified on 11 March, 2022
PREVIOUS
Send data to Amazon S3
  NEXT
Send data to Microsoft Azure Event Hubs (Beta)

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5


Was this documentation topic helpful?


You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters