Send data to Kafka
Use the Send to Kafka sink function to send data to an Apache or Confluent Kafka topic.
Prerequisites
Before you can use this function, you must do the following:
- Create a Kafka connection. You'll use the ID of this connection for the
connection_id
argument when configuring the sink function. See the following pages in the Connect to Data Sources and Destinations with DSP manual for more information:- For information about creating an SSL-authenticated connection, see Create an SSL-authenticated DSP connection to Kafka.
- For information about creating an unauthenticated connection, see Create an unauthenticated DSP connection to Kafka.
- Create the destination topic in your Apache Kafka or Confluent Kafka broker.
- For information about creating a topic in Apache Kafka, search for "Apache Kafka Quickstart" in the Apache Kafka documentation.
- For information about creating a topic in Confluent Kafka, search for "Quick Start for Apache Kafka" in the Confluent documentation.
If you activate your pipeline before creating the topic specified in the
topic
argument, the pipeline fails to send data to Kafka and restarts indefinitely.
Function input schema
- collection<record<R>>
- This function takes in collections of records with schema R.
Required arguments
- connection_id
- Syntax: string
- Description: The ID of your Kafka connection.
- Example in Canvas View: "879837b0-cabf-4bc2-8589-fcc4dad753e7"
- topic
- Syntax: string
- Description: Specify your Kafka topic here.
- Example in Canvas View: my-topic
Make sure that the destination topic exists in your Kafka broker. If you activate your pipeline before the specified topic is created, the pipeline fails to send data to Kafka and restarts indefinitely.
- key
- Syntax: expression<bytes>
- Description: Your Kafka key, in bytes. Kafka keys are used to determine which partition to send data to. Kafka only preserves the order of messages within a partition; Kafka does not preserve the order of messages across multiple partitions. Therefore, if it is important for your use case to preserve ordering, then you should either send your data to one specific partition or write an expression that determines which partition to send data to based on the contents of your data. If you set this to
to_bytes(null)
or empty stringto_bytes("")
, then all data will be routed to partition 0 in your Kafka topic. See the SPL2 examples. - Example in Canvas View: to_bytes(customerId)
- value
- Syntax: expression<bytes>
- Description: The data payload, in bytes, for each event.
- Example in Canvas View: to_bytes("")
Optional arguments
- producer_properties
- Syntax: map<string, string>
- Description: Add optional producer properties here. For a list of valid producer properties, see the "Producer Configs" section in the Apache Kafka documentation.
- Example in Canvas View: {"reconnect.backoff.max.ms": 1500}
SPL2 examples
1. Route data to a specific Kafka partition based on customerId
...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(customerId), to_bytes(""));
For example, if you have data that looks similar to the following:
customerId | change_in_bank_account |
---|---|
user123 | +200 |
user456 | +100 |
user123 | +200 |
user123 | -1337 |
Based on the key
provided, the Kafka partitioning looks something like the following:
// partition 0 user123:{"customerId": user123, "change_in_bank_account": +200} user123:{"customerId": user123, "change_in_bank_account": +200} user123:{"customerId": user123, "change_in_bank_account": -1337}
// partition 1 user456:{"customerId": user456, "change_in_bank_account": +100}
2. Route all data to partition 0 in Kafka example
When working in the SPL View, you can write the function by providing the arguments in this exact order.
...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(""), to_bytes(""));
Alternatively, you can use named arguments to declare the arguments in any order and leave out optional arguments you don't want to declare. All unprovided arguments use their default values. The following example provides the arguments in an arbitrary order.
...| into kafka(topic: topic1, connection_id: "879837b0-cabf-4bc2-8589-fcc4dad753e7", key: to_bytes(""), value: to_bytes(""));
If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.
3. Route data to a random Kafka partition
If ordering of events is not important for your use case and you have a lot of Kafka partitions, you can randomize the assigned partition to optimize for the best throughput possible instead.
...| into kafka("879837b0-cabf-4bc2-8589-fcc4dad753e7", topic1, to_bytes(randomint()), to_bytes(""));
Send data to Amazon S3 | Send data to Microsoft Azure Event Hubs (Beta) |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5
Feedback submitted, thanks!