Splunk® Data Stream Processor

Use the Data Stream Processor

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Deserialize and send Kafka data from a DSP pipeline

A DSP pipeline can ingest data from Kafka. Once you ingest your data, you can write it to any destination that DSP supports.

Prerequisites

  • A properly configured Kafka system that includes at least one broker and one defined Kafka topic that you want to ingest. For details, see the Kafka documentation.
  • A DSP Kafka connection, see Create a connection for the DSP Kafka SSL Connector in the Getting Data In manual.
  • The data incoming from a Kafka topic is read as a byte array. The Splunk Data Stream Processor has no restrictions on what that byte array of data is, but to manipulate it in a DSP pipeline, you need to convert it to a format you can process. The following steps assume that you want to ingest JSON.

Steps
Once you satisfy the prerequisites, you can ingest data from Kafka.

  1. From the Data Stream Processor home page, go to the Build Pipeline tab.
  2. Select Read from Apache Kafka as your source function.
  3. On the next page, complete the following fields:
    Field Description Example
    Connection id The name of your Kafka connection 461b1915-131e-4daf-a144-0630307436d0
    Topic You must enter one Kafka topic. my-kafka-topic
    Consumer Properties Optional. Enter any Kafka consumer properties that you want to set on the Kafka consumer that the Splunk Data Stream Processor creates. See the Apache or Confluent Kafka documentation for details of what consumer properties Kafka consumers accept. To enter more than one property, click Add input for every new property you want to add. key = value
  4. Click the + icon to add a new function.
  5. Select Eval.
  6. To deserialize your data, call deserialize-json-object within your eval function. Use the eval function's textbox to call deserialize-json-object:
    as(deserialize-json-object(get("value")), "json");
    
  7. deserialize-json-object returns a map of your JSON's key-value pairs. To index these pairs themselves, click the + icon to add a new function.
  8. Select Normalize.
  9. Click Delete to delete any existing fields that you don't want to send to your destination. If you are sending your data to Splunk Enterprise from Kafka, you might want to delete all fields, because the fields you see populated are primarily meaningful within Kafka.
  10. Any data fields that you want to send to your destination must be extracted from the json field that you extracted in step 6. It is of type map. To extract fields from this map, click New Field, then select Eval from the drop-down menu.
  11. For each new field you want to send to your destination, call the map-get function:
    Expression / Original Field Output
    map-get(get("json"), "id"); id
  12. Choose where you want to write your transformed data:
    • Write to a Splunk Enterprise index
    • Write to Kafka

Write to a Splunk Enterprise index

Prerequisites

Write your transformed Kafka data to a Splunk index:

  1. Click the + icon to add a new function to the pipeline you have created so far.
  2. Select Write to Splunk Enterprise as your sink function.
  3. Select a Connection and an Index from the drop-down list:
    Field Example
    index literal("main");
    parameters Optional. hec-enable-token = true
  4. Click Validate to confirm your pipeline's functions are correctly configured.
  5. Click Save to save your pipeline, or Activate to activate it.

For more information on how the Write to Splunk Enterprise and the Write to Index functions send your data to Splunk Enterprise, see Formatting event data.

Write to Kafka

You can write data transformed by your DSP pipeline back to Kafka.

  1. After you have performed transforming functions on your deserialized data, you must serialize it again before writing to Kafka. Click the + icon to add a new function. In this example, the data is being serialized within the Write to Kafka function.
  2. Select Write to Kafka as your sink function.
  3. Complete the following fields:
    Field Description Example
    Connection-Id The name of your Kafka connection. 461b1915-131e-4daf-a144-0630307436d0
    Topic You must enter one Kafka topic. my-kafka-topic
    Key Your Kafka key, in bytes. Kafka keys are used for partition assignment. To use Kafka's default partition assignment mechanism, set this to null. to-bytes(get("key"));
    Value The data payload, in bytes, for each event. to-bytes(get("value"));
  4. Click Validate to confirm your pipeline's functions are correctly configured.
  5. Click Save to save your pipeline, or Activate to activate it.
Last modified on 05 March, 2020
Create a Splunk DSP pipeline that processes universal forwarder data   Aggregate records in a pipeline

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters