Sink functions (Data Destinations)
The following sink functions are available for your pipeline:
Sink functions do not have preview data enabled. To see if your index or third-party service is successfully receiving your data, you must search for your data in your index or third-party service.
Write to Index
This function sends data to the preconfigured Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. To use this function, your tenant administrator must have configured a default Splunk Enterprise instance during installation. See environment variables in the Install and Administer the Data Stream Processor manual.
API function name: write_index
Function Input:
See About sending data to Splunk Enterprise.
Arguments:
Argument | Input | Description | DSL Example |
---|---|---|---|
module | literal(""); | Set this to null. | literal(""); |
dataset | expression<string> | The Splunk index you want to send data to. Defaults to main . |
literal("main"); |
DSL example:
Argument | DSL Example |
---|---|
module | literal(""); |
dataset | literal("metrics"); |
Write to Kafka
Send data to an Apache or Confluent Kafka topic using a Kafka SSL connection. See the Kafka SSL Connector. See also Create a Kafka pipeline. This sink function is only available if you have the DSP Universal license.
To send data to a Kafka topic, you must provide the topic
, key
and value (payload)
to the Write to Kafka function. You can only specify one topic per Write to Kafka function. The key
and value
fields are dynamic: you specify them on a per-record basis. You can use the get
scalar function to extract the right field out of your record, shown in the examples below. The key and value passed into this function must return bytes, otherwise, your pipeline fails to validate.
- API function name
write_kafka
- Function Input
- collection<record<R>>
- This function takes in collections of records with schema R.
Arguments:
Argument | Input | Description | UI Example |
---|---|---|---|
connection-id | string | The name of the Kafka connection. | 879837b0-cabf-4bc2-8589-fcc4dad753e7 |
topic | string | Specify your Kafka topic here. | my-topic |
key | expression<bytes> | Your Kafka key, in bytes. Kafka keys are used for partition assignment. To use Kafka's default partition assignment mechanism, set this to null . |
get("key"); |
value | expression<bytes> | The data payload, in bytes, for each event. | get("value"); |
producer-properties | map<string, string> |
Add optional producer properties here. You can set this to null. | {"reconnect.backoff.max.ms": 1500} |
DSL example:
write_kafka(input, 879837b0-cabf-4bc2-8589-fcc4dad753e7, topic1, get("key"), get("value"));
Write to a Kafka topic
Send data to an unauthenticated Kafka topic. This sink function is only available if you have the DSP Universal license.
API function name: unauthenticated_write_kafka
Function Input:
To send data to a Kafka topic, your data must have the following schema. If your data has any additional fields, or is missing any of the following fields, then your pipeline will fail to validate.
{ key: <key> in bytes. value: <value> in bytes. topic: <topic> as a string. }
Arguments:
Argument | Input | Description | DSL Example |
---|---|---|---|
brokers | string | Specify your Kafka brokers here. | "kafka:9092, kafka:9093" |
producer-properties | map<string, string> |
Add optional producer properties here. You can set this to null. | {} |
DSL example:
Argument | DSL Example |
---|---|
brokers | "kafka:9092, kafka:9093" |
producer-properties | {} |
Write to Kafka with SSL
This function is deprecated. Use the Write to Kafka function instead. Send data to a Kafka topic using SSL. A DSP admin must configure SSL, see Kafka encryption and authentication using SSL. This sink function is only available if you have the DSP Universal license.
API function name: write_ssl_kafka
Function Input:
To send data to a Kafka topic, your data must have the following schema. If your data has any additional fields, or is missing any of the following fields, then your pipeline will fail to validate.
{ key: <key> in bytes. value: <value> in bytes. topic: <topic> as a string. }
Arguments:
Argument | Input | Description | DSL Example |
---|---|---|---|
brokers | string | Specify your Kafka brokers here. | "kafka:9092, kafka:9093" |
producer-properties | map<string, string> |
Add optional producer properties here. You can set this to null. | {} |
DSL example:
Argument | DSL Example |
---|---|
brokers | "kafka:9092, kafka:9093" |
producer-properties | {} |
Write to Kinesis
Send data to a Kinesis Stream using an AWS access key and secret key authentication. Optionally, you can specify the Kinesis partition key for each record. If you do not provide a partition key, a hash of the payload determines the partition key. This sink function is only available if you have the DSP Universal license.
This is a connector-based function. To use it, you must first create a Kinesis connection. Use that connection_id
as an argument for this function.
API function name: write_kinesis
Function Input:
Accepts records with any specific schema, but records must have the body
field serialized as bytes.
Arguments:
Argument | Input | Description | DSL Example |
---|---|---|---|
connection-id | connection-id |
The ID of the Kinesis connection you must create before using this function. | "conx-2b39464e-0924" |
stream-name | string | The name of the stream you want to write to Kinesis. | "my-stream-123" |
body | expression<bytes> | The JSON body you want to write. | get("json-body"); |
partition-key (Optional) | expression<string> | Your Kinesis partition key. See the AWS documentation about partion keys. | get("partition-key"); |
DSL example:
Argument | DSL Example |
---|---|
connection-id | "conx-2b39464e-0924" |
stream-name | "my-stream-123" |
body | get("json-body") |
partition-key (Optional) | get("partition-key")) |
Write to Null
Send data to a default sink that discards the events and terminates the stream.
API function name: write_null
Function Input:
Accepts records with any specific schema.
DSL example:
write_null(input);
Write to Splunk Enterprise
Send data to an external Splunk Enterprise system. You must create a Splunk Enterprise connection to use this function, see Send data to Splunk Enterprise. This function sends data to a Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. See About sending data to Splunk Enterprise for more information.
API function name: write_splunk_enterprise
Function Input:
See About sending data to Splunk Enterprise.
Arguments:
Argument | Input | Description | UI Example |
---|---|---|---|
Index | expression<string> | The index to send your data to. | literal("main"); |
Parameters | map<string, string> | Optional. The following rows list the optional parameters you can enter in this function. | |
hec-token-validation | Boolean | Set to true to enable HEC token validation. Defaults to true. | hec-token-validation = true |
hec-enable-ack | Boolean | Set to true for the function to wait for an acknowledgement for every single event. Set to false if acknowledgments in your Splunk platform are disabled or to increase throughput. Defaults to true. | hec-enable-ack = true |
hec-gzip-compression | Boolean | Set to true to compress HEC JSON data and increase throughput at the expense of increasing pipeline CPU utilization. Defaults to false. | hec-gzip-compression = false |
async | Boolean | Set to true to send data asynchronously. Best practices are to enable this for performance optimization. See Performance expectations for sending data from a data pipeline to Splunk Enterprise. Defaults to false. | async = false |
async-max-retries | number | Ignored unless async is set to True. The maximum number of times the DSP HEC client will attempt to write a HEC JSON payload to the Splunk Enterprise HEC endpoint. Defaults to 3.
|
async-max-retries = 4 |
async-timeout-ms | number | Ignored unless async is set to True. The maximum time, in milliseconds, to wait for an asynchronous write to the Splunk Enterprise HEC endpoint to succeed. When an async I/O request times out, an exception is thrown and the pipeline is restarted. Defaults to 10000 (10 seconds).
|
async-timeout-ms = 10000 |
async-capacity | number | Ignored unless async is set to True. The maximum number of async I/O operations that can be happening concurrently. This number is total across all indexers, not per indexer. Defaults to 100.
|
async-capacity = 100 |
DSL example:
Argument | DSL Example |
---|---|
Index | literal("metrics"); |
Parameters | hec-enable-ack = true |
Source functions (Data Sources) | Get data from Apache Pulsar |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1
Feedback submitted, thanks!