Splunk® Data Stream Processor

Function Reference

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Sink functions (Data Destinations)

The following sink functions are available for your pipeline:

Sink functions do not have preview data enabled. To see if your index or third-party service is successfully receiving your data, you must search for your data in your index or third-party service.

Write to Index

This function sends data to the preconfigured Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. To use this function, your tenant administrator must have configured a default Splunk Enterprise instance during installation. See environment variables in the Install and Administer the Data Stream Processor manual.

API function name: write_index
Function Input:
See About sending data to Splunk Enterprise.
Arguments:

Argument Input Description DSL Example
module literal(""); Set this to null. literal("");
dataset expression<string> The Splunk index you want to send data to. Defaults to main. literal("main");

DSL example:

Argument DSL Example
module literal("");
dataset literal("metrics");

Write to Kafka

Send data to an Apache or Confluent Kafka topic using a Kafka SSL connection. See the Kafka SSL Connector. See also Create a Kafka pipeline. This sink function is only available if you have the DSP Universal license.

To send data to a Kafka topic, you must provide the topic, key and value (payload) to the Write to Kafka function. You can only specify one topic per Write to Kafka function. The key and value fields are dynamic: you specify them on a per-record basis. You can use the get scalar function to extract the right field out of your record, shown in the examples below. The key and value passed into this function must return bytes, otherwise, your pipeline fails to validate.

API function name
write_kafka
Function Input
collection<record<R>>
This function takes in collections of records with schema R.

Arguments:

Argument Input Description UI Example
connection-id string The name of the Kafka connection. 879837b0-cabf-4bc2-8589-fcc4dad753e7
topic string Specify your Kafka topic here. my-topic
key expression<bytes> Your Kafka key, in bytes. Kafka keys are used for partition assignment. To use Kafka's default partition assignment mechanism, set this to null. get("key");
value expression<bytes> The data payload, in bytes, for each event. get("value");
producer-properties map<string, string> Add optional producer properties here. You can set this to null. {"reconnect.backoff.max.ms": 1500}

DSL example:

write_kafka(input, 879837b0-cabf-4bc2-8589-fcc4dad753e7, topic1, get("key"), get("value"));

Write to a Kafka topic

Send data to an unauthenticated Kafka topic. This sink function is only available if you have the DSP Universal license.

API function name: unauthenticated_write_kafka
Function Input:
To send data to a Kafka topic, your data must have the following schema. If your data has any additional fields, or is missing any of the following fields, then your pipeline will fail to validate.

{
key: <key> in bytes. 
value: <value> in bytes. 
topic: <topic> as a string. 
}

Arguments:

Argument Input Description DSL Example
brokers string Specify your Kafka brokers here. "kafka:9092, kafka:9093"
producer-properties map<string, string> Add optional producer properties here. You can set this to null. {}

DSL example:

Argument DSL Example
brokers "kafka:9092, kafka:9093"
producer-properties {}

Write to Kafka with SSL

This function is deprecated. Use the Write to Kafka function instead. Send data to a Kafka topic using SSL. A DSP admin must configure SSL, see Kafka encryption and authentication using SSL. This sink function is only available if you have the DSP Universal license.
API function name: write_ssl_kafka
Function Input:
To send data to a Kafka topic, your data must have the following schema. If your data has any additional fields, or is missing any of the following fields, then your pipeline will fail to validate.

{
key: <key> in bytes. 
value: <value> in bytes. 
topic: <topic> as a string. 
}

Arguments:

Argument Input Description DSL Example
brokers string Specify your Kafka brokers here. "kafka:9092, kafka:9093"
producer-properties map<string, string> Add optional producer properties here. You can set this to null. {}

DSL example:

Argument DSL Example
brokers "kafka:9092, kafka:9093"
producer-properties {}

Write to Kinesis

Send data to a Kinesis Stream using an AWS access key and secret key authentication. Optionally, you can specify the Kinesis partition key for each record. If you do not provide a partition key, a hash of the payload determines the partition key. This sink function is only available if you have the DSP Universal license.

This is a connector-based function. To use it, you must first create a Kinesis connection. Use that connection_id as an argument for this function.

API function name: write_kinesis
Function Input:
Accepts records with any specific schema, but records must have the body field serialized as bytes.
Arguments:

Argument Input Description DSL Example
connection-id connection-id The ID of the Kinesis connection you must create before using this function. "conx-2b39464e-0924"
stream-name string The name of the stream you want to write to Kinesis. "my-stream-123"
body expression<bytes> The JSON body you want to write. get("json-body");
partition-key (Optional) expression<string> Your Kinesis partition key. See the AWS documentation about partion keys. get("partition-key");

DSL example:

Argument DSL Example
connection-id "conx-2b39464e-0924"
stream-name "my-stream-123"
body get("json-body")
partition-key (Optional) get("partition-key"))

Write to Null

Send data to a default sink that discards the events and terminates the stream.

API function name: write_null
Function Input:
Accepts records with any specific schema.
DSL example:

write_null(input);

Write to Splunk Enterprise

Send data to an external Splunk Enterprise system. You must create a Splunk Enterprise connection to use this function, see Send data to Splunk Enterprise. This function sends data to a Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. See About sending data to Splunk Enterprise for more information.

API function name: write_splunk_enterprise
Function Input:
See About sending data to Splunk Enterprise.
Arguments:

Argument Input Description UI Example
Index expression<string> The index to send your data to. literal("main");
Parameters map<string, string> Optional. The following rows list the optional parameters you can enter in this function.
hec-token-validation Boolean Set to true to enable HEC token validation. Defaults to true. hec-token-validation = true
hec-enable-ack Boolean Set to true for the function to wait for an acknowledgement for every single event. Set to false if acknowledgments in your Splunk platform are disabled or to increase throughput. Defaults to true. hec-enable-ack = true
hec-gzip-compression Boolean Set to true to compress HEC JSON data and increase throughput at the expense of increasing pipeline CPU utilization. Defaults to false. hec-gzip-compression = false
async Boolean Set to true to send data asynchronously. Best practices are to enable this for performance optimization. See Performance expectations for sending data from a data pipeline to Splunk Enterprise. Defaults to false. async = false
async-max-retries number Ignored unless async is set to True. The maximum number of times the DSP HEC client will attempt to write a HEC JSON payload to the Splunk Enterprise HEC endpoint. Defaults to 3. async-max-retries = 4
async-timeout-ms number Ignored unless async is set to True. The maximum time, in milliseconds, to wait for an asynchronous write to the Splunk Enterprise HEC endpoint to succeed. When an async I/O request times out, an exception is thrown and the pipeline is restarted. Defaults to 10000 (10 seconds). async-timeout-ms = 10000
async-capacity number Ignored unless async is set to True. The maximum number of async I/O operations that can be happening concurrently. This number is total across all indexers, not per indexer. Defaults to 100. async-capacity = 100

DSL example:

Argument DSL Example
Index literal("metrics");
Parameters hec-enable-ack = true
Last modified on 22 May, 2020
Source functions (Data Sources)   Get data from Apache Pulsar

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters