Deserialize and send Kafka data from a DSP pipeline
A DSP pipeline can ingest data from Kafka. Once you ingest your data, you can write it to any destination that DSP supports.
Prerequisites
- A properly configured Kafka system that includes at least one broker and one defined Kafka topic that you want to ingest. For details, see the Kafka documentation.
- A DSP Kafka connection, see Create a connection for the DSP Kafka SSL Connector in the Getting Data In manual.
- The data incoming from a Kafka topic is read as a byte array. The Splunk Data Stream Processor has no restrictions on what that byte array of data is, but to manipulate it in a DSP pipeline, you need to convert it to a format you can process. The following steps assume that you want to ingest JSON.
Steps
Once you satisfy the prerequisites, you can ingest data from Kafka.
- From the Data Stream Processor home page, go to the Build Pipeline tab.
- Select Read from Apache Kafka as your source function.
- On the next page, complete the following fields:
Field Description Example Connection id The name of your Kafka connection 461b1915-131e-4daf-a144-0630307436d0 Topic You must enter one Kafka topic. my-kafka-topic Consumer Properties Optional. Enter any Kafka consumer properties that you want to set on the Kafka consumer that the Splunk Data Stream Processor creates. See the Apache or Confluent Kafka documentation for details of what consumer properties Kafka consumers accept. To enter more than one property, click Add input for every new property you want to add. key = value - Click the + icon to add a new function.
- Select
Eval
. - To deserialize your data, call
deserialize-json-object
within youreval
function. Use theeval
function's textbox to calldeserialize-json-object
:as(deserialize-json-object(get("value")), "json");
deserialize-json-object
returns a map of your JSON's key-value pairs. To index these pairs themselves, click the + icon to add a new function.- Select Normalize.
- Click Delete to delete any existing fields that you don't want to send to your destination. If you are sending your data to Splunk Enterprise from Kafka, you might want to delete all fields, because the fields you see populated are primarily meaningful within Kafka.
- Any data fields that you want to send to your destination must be extracted from the json field that you extracted in step 6. It is of type map. To extract fields from this map, click New Field, then select Eval from the drop-down menu.
- For each new field you want to send to your destination, call the
map-get
function:Expression / Original Field Output map-get(get("json"), "id"); id - Choose where you want to write your transformed data:
- Write to a Splunk Enterprise index
- Write to Kafka
Write to a Splunk Enterprise index
Prerequisites
Write your transformed Kafka data to a Splunk index:
- Click the + icon to add a new function to the pipeline you have created so far.
- Select Write to Splunk Enterprise as your sink function.
- Select a Connection and an Index from the drop-down list:
Field Example index literal("main"); parameters Optional. hec-enable-token = true - Click Validate to confirm your pipeline's functions are correctly configured.
- Click Save to save your pipeline, or Activate to activate it.
For more information on how the Write to Splunk Enterprise
and the Write to Index
functions send your data to Splunk Enterprise, see Formatting event data.
Write to Kafka
You can write data transformed by your DSP pipeline back to Kafka.
- After you have performed transforming functions on your deserialized data, you must serialize it again before writing to Kafka. Click the + icon to add a new function. In this example, the data is being serialized within the Write to Kafka function.
- Select Write to Kafka as your sink function.
- Complete the following fields:
Field Description Example Connection-Id The name of your Kafka connection. 461b1915-131e-4daf-a144-0630307436d0 Topic You must enter one Kafka topic. my-kafka-topic Key Your Kafka key, in bytes. Kafka keys are used for partition assignment. To use Kafka's default partition assignment mechanism, set this to null
.to-bytes(get("key")); Value The data payload, in bytes, for each event. to-bytes(get("value")); - Click Validate to confirm your pipeline's functions are correctly configured.
- Click Save to save your pipeline, or Activate to activate it.
Create a Splunk DSP pipeline that processes universal forwarder data | Aggregate records in a pipeline |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1
Feedback submitted, thanks!