Splunk® Data Stream Processor

Function Reference

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Sink functions (Data Destinations)

The following sink functions are available for your pipeline:

Sink functions do not have preview data enabled. To see if your index or third-party service is successfully receiving your data, you must search for your data in your index or third-party service.

Write to the Splunk platform with Batching

Send data to an external Splunk Enterprise system. You must create a Splunk Enterprise connection to use this function, see Send data to Splunk Enterprise. The Splunk Enterprise Indexes function combines the actions of three underlying DSP functions into one for user convenience:

This function adds out-of-the-box support for index-based routing with batched data. If you want to send data from DSP to multiple Splunk Enterprise indexes, you can use this function to specify the target index on a per-record basis. Additionally, you can specify how often batches are emitted by one of two optional arguments: batch_size, which specifies a max payload size in bytes or batch_millis which specifies a max time to wait before emitting the batch.

Function Name
splunk_enterprise_indexes
Function Input
See About sending data to Splunk Enterprise.
Arguments
Argument Input Description Example
connection_id connection-id ID of the Splunk Enterprise Connection "576205b3-f6f5-4ab7-8ffc-a4089a95d0c4"
index expression<string> An expression to get the Splunk Index, if it exists, in your record. If your data does not contain an index, set this field to empty string "". cast(map_get(attributes, "index"), "string")
default_index expression<string> If your record doesn't contain a Splunk Index field, then this function sends your data to the index specified in this argument. If you do not want to specify a default index, set this field to empty string "". "main"
parameters map<string, string> Optional. See the "Parameters" table for available options.
batch_size string Optional. The maximum size, in bytes, of the emitted batched byte[]. The size of your emitted batched bytes cannot exceed 100 MB. Defaults to 10MB. "2MB"
batch_millis long Optional. The interval, in milliseconds, at which to send batched data to Splunk Enterprise. Defaults to 10000. 2000
Parameter options Input Description Example
parameters map<string, string> Optional. The following rows list the optional parameters you can enter in this function. See the "Parameters" table for available options. Defaults to empty { }.
hec-token-validation boolean Set to true to enable HEC token validation. Defaults to true. hec-token-validation: true
hec-enable-ack boolean Set to true for the function to wait for an acknowledgement for every single event. Set to false if acknowledgments in your Splunk platform are disabled or to increase throughput. Defaults to true. hec-enable-ack: true
hec-gzip-compression boolean Set to true to compress HEC JSON data and increase throughput at the expense of increasing pipeline CPU utilization. Defaults to false. hec-gzip-compression: false
async boolean Set to true to send data asynchronously. In async mode, send operations from DSP do not wait for a response to return therefore increasing performance. Best practices are to enable this for performance optimization. See Performance expectations for sending data from a data pipeline to Splunk Enterprise. Defaults to false. async: false
async-max-retries number Ignored unless async is set to True. The maximum number of times the DSP HEC client will attempt to write a HEC JSON payload to the Splunk Enterprise HEC endpoint. Defaults to 3. async-max-retries: 4
async-timeout-ms number Ignored unless async is set to True. The maximum time, in milliseconds, to wait for an asynchronous write to the Splunk Enterprise HEC endpoint to succeed. When an async I/O request times out, an exception is thrown and the pipeline is restarted. Defaults to 10000 (10 seconds). async-timeout-ms: 10000
async-capacity number Ignored unless async is set to True. The maximum number of async I/O operations that can be happening concurrently. This number is total across all indexers, not per indexer. Defaults to 100. async-capacity: 100

SPL2 Pipeline Builder example:

In this example, records are sent to the index specified in the index key of the attributes field. If the index key does not exist in attributes, then that record is sent to the main index. Additionally, the async, hec-token-validation, and hec-gzip-compression fields are configured for optimal throughput. Finally, the Splunk Enterprise Indexes function sends your data to the HEC endpoint when your payload reaches 100B in size.

| from read_splunk_firehose()
| into splunk_enterprise_indexes(
    "b5c57cbd-1470-4639-9938-deb3509cbbc8",
    cast(map_get(attributes, "index"), "string"),
    "events_idx_2",
    {"async": "true", "hec-enable-ack": "false", "hec-token-validation": "true", "hec-gzip-compression": "true"},
    "100B"
  );
 

Assume that you have the following three records in your data:

Record{ 
  body="my data 1", source_type="mysourcetype1", id="id1", source="mysource", timestamp=1234567890011, host="myhost1", attributes={"attr1":"val1", "index":"index1"}
}
Record{ 
  body="my data 2", source_type="mysourcetype2", id="id2", source="mysource", timestamp=1234567890012, host="myhost2", attributes={"index":"index2"}
}
Record{ 
  body="my data 3", source_type="mysourcetype3", id="id3", source="mysource", timestamp=1234567890013, host="myhost3"
}

Sending these records to the Splunk_Enterprise_Indexes function with the arguments specified in the earlier SPL2 Pipeline Builder example results in the following HEC JSON payload:

{"event":"my data 1", "sourcetype":"mysourcetype1", "source":"mysource", "host":"myhost1", "index": "index1", "time":"1234567890.011"}
{"event":"my data 2", "sourcetype":"mysourcetype2", "source":"mysource", "host":"myhost2", "index": "index2", "time":"1234567890.012"}
{"event":"my data 3", "sourcetype":"mysourcetype3", "source":"mysource", "host":"myhost3", time":"1234567890.013"}

Write to the Splunk platform

Send data to an external Splunk Enterprise system. You must create a Splunk Enterprise connection to use this function, see Send data to Splunk Enterprise. This function sends data to a Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. See About sending data to Splunk Enterprise for more information.

Function Name
splunk_enterprise
Function Input
See About sending data to Splunk Enterprise.
Arguments
Argument Input Description Example
connection_id connection-id ID of the Splunk Enterprise Connection "576205b3-f6f5-4ab7-8ffc-a4089a95d0c4"
index expression<string> The index to send your data to. "main"
payload expression<bytes> The name of the DSP record field (for example, "bytes") that has the byte payload to be written directly to the HEC endpoint. Set to null if your records are not in bytes. If the payload is set to null and you are not using the To Splunk JSON function in your pipeline, see Formatting event data in DSP for information on how your events are indexed into Splunk Enterprise. bytes
parameters map<string, string> Optional. See the "Parameters" table for available options.
Parameter options Input Description Example
parameters map<string, string> Optional. The following rows list the optional parameters you can enter in this function. See the "Parameters" table for available options. Defaults to empty { }.
hec-token-validation boolean Set to true to enable HEC token validation. Defaults to true. hec-token-validation: true
hec-enable-ack boolean Set to true for the function to wait for an acknowledgement for every single event. Set to false if acknowledgments in your Splunk platform are disabled or to increase throughput. Defaults to true. hec-enable-ack: true
hec-gzip-compression boolean Set to true to compress HEC JSON data and increase throughput at the expense of increasing pipeline CPU utilization. Defaults to false. hec-gzip-compression: false
async boolean Set to true to send data asynchronously. In async mode, send operations from DSP do not wait for a response to return therefore increasing performance. Best practices are to enable this for performance optimization. See Performance expectations for sending data from a data pipeline to Splunk Enterprise. Defaults to false. async: false
async-max-retries number Ignored unless async is set to True. The maximum number of times the DSP HEC client will attempt to write a HEC JSON payload to the Splunk Enterprise HEC endpoint. Defaults to 3. async-max-retries: 4
async-timeout-ms number Ignored unless async is set to True. The maximum time, in milliseconds, to wait for an asynchronous write to the Splunk Enterprise HEC endpoint to succeed. When an async I/O request times out, an exception is thrown and the pipeline is restarted. Defaults to 10000 (10 seconds). async-timeout-ms: 10000
async-capacity number Ignored unless async is set to True. The maximum number of async I/O operations that can be happening concurrently. This number is total across all indexers, not per indexer. Defaults to 100. async-capacity: 100

SPL2 Pipeline Builder example:

In this example, your data comes out of batch_bytes as batched byte payloads with a max size of 2MB and is passed into the splunk_enterprise sink function. This data is then sent to the Splunk Enterprise endpoint for indexing.


...| batch_bytes bytes=get(json) size="2MB" millis=5000
| into splunk_enterprise(
    "b5c57cbd-1470-4639-9938-deb3509cbbc8",
    "events_idx",
    bytes,
    {"async": "true", "hec-enable-ack": "false", "hec-token-validation": "true"}
   );

Write to the Splunk platform (Default for Environment)

This function sends data to the preconfigured Splunk Enterprise index using the Splunk HTTP Event Collector (HEC), which requires events to be formatted in a specific way. To use this function, your tenant administrator must have configured a default Splunk Enterprise instance during installation. See environment variables in the Install and Administer the Data Stream Processor manual.

Function Name
write_index
Function Input
See About sending data to Splunk Enterprise.
Arguments
Argument Input Description Example
module "" Set this to "". ""
dataset expression<string> The Splunk index you want to send data to. Defaults to main. "main"

SPL2 Pipeline Builder example:

...| into write_index("", "main");

Write to Kafka

Send data to an Apache or Confluent Kafka topic. This sink function is only available if you have the DSP Universal license.

This is a connector-based function. To use it, you must first create a Kafka connection. See Send data from Splunk DSP to Kafka using SSL and Send data from Splunk DSP to Kafka without authentication. After creating a Kafka connection, use that connection_id as an argument for this function.

To send data to a Kafka topic, you must provide the topic, key and value (payload) to the Write to Kafka function. You can only specify one topic per Write to Kafka function. The key and value fields are dynamic: you specify them on a per-record basis. The key and value passed into this function must return bytes, otherwise, your pipeline fails to validate.

Function Name
write_kafka
Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Arguments
Argument Input Description Example
connection_id string The ID of your Kafka connection. 879837b0-cabf-4bc2-8589-fcc4dad753e7
topic string Specify your Kafka topic here. my-topic
key expression<bytes> Your Kafka key, in bytes. Kafka keys are used for partition assignment. To use Kafka's default partition assignment mechanism, set this to null. to_bytes("")
value expression<bytes> The data payload, in bytes, for each event. value
producer-properties map<string, string> Add optional producer properties here. You can set this to null. {"reconnect.backoff.max.ms": 1500}

SPL2 Pipeline Builder example:

...| into write_kafka(879837b0-cabf-4bc2-8589-fcc4dad753e7, topic1, to_bytes(""), value);

Write to Kinesis

Send data to a Kinesis Stream using an AWS access key and secret key authentication. Optionally, you can specify the Kinesis partition key for each record. If you do not provide a partition key, a hash of the payload determines the partition key. This sink function is only available if you have the DSP Universal license.

This is a connector-based function. To use it, you must first create a Kinesis connection. Use that connection_id as an argument for this function.

Function Name
write_kinesis
Function Input
Accepts records with any specific schema, but records must have the body field serialized as bytes.
Arguments
Argument Input Description DSL Example
connection_id connection_id The ID of the Kinesis connection you must create before using this function. "conx-2b39464e-0924"
stream-Name string The Name of the stream you want to write to Kinesis. "my-stream-123"
body expression<bytes> The JSON body you want to write. json-body
partition-key (Optional) expression<string> Your Kinesis partition key. See the AWS documentation about partion keys. Defaults to null. partition-key

SPL2 Pipeline Builder example:

...| into write_kinesis("879837b0-cabf-4bc2-8589-fcc4dad753e7", "my-stream-123", to_bytes("body"));

Write to Null

Send data to a default sink that discards the events and terminates the stream.

Function Name
write_null
Function Input
Accepts records with any specific schema.

SPL2 Pipeline Builder example:

...| into write_null();

Write to S3-compatible storage

Send data to an Amazon S3 bucket. See Send data from DSP to Amazon S3 for more information about creating an S3 connection. Generated Amazon S3 objects cannot be compressed in the Amazon S3 destination bucket. This sink function is only available if you have the DSP Universal license.

This is a connector-based function. To use it, you must first create an S3 connection. Use that connection_id as an argument for this function.

Currently, we support writing to Amazon S3 buckets. Third-party S3-compatible vendors are not supported.

Function Name
s3
Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Arguments
Parameter Type Description
connection_id connection The Amazon S3 connection ID.
bucket string The name of the destination bucket in Amazon S3.
prefix string The prefix to use in the object keys of the generated Amazon S3 objects.


This parameter supports prefix patterns with substitutions. Using the special syntax #{ }, you can specify field names that will be substituted with the actual field value, or specify date-time formats that will be substituted with a timestamp indicating when the S3 object was generated. See the "Prefix parameter substitutions" section for more information.

size_mb long Only applicable when format is set to JSON. The maximum allowed file size in MB. When the maximum file size is reached, the current part of the file is closed and a new file part with the same object key prefix is created.
rollover_secs long Only applicable when format is set to JSON. The maximum time interval in seconds that a part of a file can stay open before it is closed and a new file part with the same object key prefix is created. The rollover interval is limited to multiples of 60 seconds. If you set rollover_secs to a value less than 60 seconds, the rollover interval is set to 60 seconds. If you set rollover_secs to a value greater than 60 seconds, the rollover interval is rounded up to the closest multiple of 60 seconds.
format string The format for data in the Amazon S3 destination. The format can be set to:
  • JSON for line-delimited JSON format.
  • Parquet for Parquet format. See the "Parquet format" section on this page for information about limitations that apply when Parquet format is used.

These values are case-insensitive. For example, json and parquet are also accepted.

inactivity_secs long Optional. Only applicable when format is set to JSON. The number of seconds of inactivity allowed before the current part of a file is closed and a new file part with the same object key prefix is created. Defaults to 60 seconds.
algorithm string Optional. The server-side encryption algorithm. Encryption keys are not stored or managed by DSP. The algorithm can be set to:
  • KMS for SSE-KMS. To use this algorithm, you must have the required KMS permissions in Amazon S3. See the "Permissions required to enable KMS" section on this page for more information.
  • AES-256 for SSE-S3.

For more information about these encryption algorithms, search for "Protecting data using server-side encryption" in the Amazon S3 documentation.

If you leave this parameter empty, DSP does not require the data to be encrypted in Amazon S3.

key_id string Optional. Only applicable when algorithm is set to KMS. The key Amazon Resource Name (ARN) of the Customer Master Key (CMK). Specify the key ARN using this format: arn:aws:kms:<region>:<account-id>:key/<key-id>. Encryption keys are not stored or managed by DSP.


If you leave this parameter empty, the AWS default master key for the region is used.

Prefix parameter substitutions

The prefix parameter supports two types of substitution: event field and date-time substitutions. When JSON format is used, both types of substitutions are supported and can be used at the same time. When Parquet format is used, only date-time substitution is supported.

Event field substitution

You can use dot-delimited notation in a #{ } pattern to use the actual value of a field in the event as part of the prefix. For example, if you have the following event, then the parameter substitution notation #{attributes.account_id} sets the prefix to my_account and #{body.metadata.name} sets the prefix to my_metadata_name.

{
  "body": {
    "metadata": {
      "name": "my_metadata_name"
    }
  },
  "attributes": {
    "account_id": "my_account"
  }
}

If you define the prefix with a field that does not exist in the event, then the substitution value will be set to unknown. Any leading or trailing whitespaces inside #{ } are trimmed before substitution. An empty event file substitution pattern, such as #{}, is not allowed.

Do not use fields with a large number of unique values in the prefix substitution pattern. For example, the substitution pattern #{fieldname} where fieldname is a field that is substituted with an event ID or a timestamp will generate a large number of prefixes and may cause unpredictable results.

Date-time format substitution

You can use a date-time format inside a #{datetime: } pattern to generate a prefix with date and time values. For example, the parameter substitution notation #{datetime:yyyy-MM} generates a prefix with a year and month value similar to 2020-03 and #{datetime:yyyy-MM-dd} generates prefix with a year, month, and day value similar to 2020-03-31. Any leading or trailing whitespaces inside #{datetime: } are trimmed before substitution. An empty date-time substitution pattern, such as #{datetime:}, is not allowed.

Combining both substitution types

When using JSON format, you can use both substitution types at the same time. For example, if you have the sample event shown in the "Event field substitution" section, the parameter substitution notation dsp-#{attributes.account_id}-#{datetime:yyyy-MM} sets the prefix to dsp-my_account-2020-03. As another example, the parameter substitution notation dsp/#{body.metadata.name}/io/#{datetime:yyyy/MM/dd} sets the prefix to dsp/my_metadata_name/io/2020/03/31.

Parquet format

If Parquet format is used:

  • rollover_secs, size_mb, and inactivity_sec are not used. File parts are automatically closed every 60 seconds and a new file part with the Amazon S3 object key prefix is created.
  • Bytes per second and events per second metrics are not available.
  • prefix only supports date-time substitution. Event field substitutions are not supported.

Null values in string type fields

If your pipeline has an eval function that results in a null value in a string type field, the pipeline will fail. To prevent this failure, you can add another eval function that will conditionally set the null values to an empty string.

eval <field>=cast(map_get(attributes, "<field>"), "string") | eval <field>=if(isnull(<field>), "", <field>)

Event payload limitations

Event payloads have the following limitations when Parquet format is used:

  • body can be one of the basic data types: string, bytes, or number (int32, int64, float, or double), or it can be a list or a map of the basic types.
  • attributes specify JSON objects that contain explicit custom fields. The values for these fields can be one of the basic data types: string, bytes, or number (int32, int64, float, or double), or a list or map of the basic types.

See Event schema for more information.

The following example payload shows a body set to a map of a basic data type. This is a valid payload.

{
  "body": {
           "f1": "v1",
           "f2": "v2",
           "f3": "v3"
           },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

The following example payload shows a body set to a map of an array. This is an invalid payload and the pipeline will fail.

{
  "body": {
           "myarray": ["v11", "v12"]
           },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

Defaults used for Parquet format

The following defaults are used for Parquet format:

  • Default Parquet Writer version: Parquet v1
  • Default compression codec: UNCOMPRESSED
  • Default Row group size: 128 x 1024 x 1024 bytes
  • Dictionary encoding: Enabled
  • Default dictionary page size: 1024 x 1024 bytes

Permissions required to enable KMS

To use KMS as the encryption algorithm, the AWS account used in your Amazon S3 connection must have the following permissions:

  • kms:Decrypt
  • kms:GenerateDataKey

If your AWS Identity and Access Management (IAM) user or role is in the same AWS account as the AWS KMS key, then you must have permission to kms:Decrypt in the key policy. If your IAM user or role belongs to a different account than the key, then you must have permission to kms:Decrypt in both the key policy and your IAM permissions.

SPL2 Pipeline Builder example:

...| into s3("connection_id", "my_bucket", "prefix", 1024, 10, "JSON", 60, "AES-256");

Write to SignalFx

Send data to a SignalFx endpoint. See Send data from DSP to SignalFx for more information about creating a SignalFx connection.

This is a connector-based function. To use it, you must first create a SignalFx connection. Use that connection_id as an argument for this function.

Function Name
signalfx
Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Arguments
Parameter Type Description
connection_id string The SignalFx connection ID.
metric_name expression<string> The SignalFx metric name.
metric_value expression<double> The SignalFx metric value.
metric_type expression<string> The SignalFx metric type. This can be set to:
  • COUNTER
  • CUMULATIVE_COUNTER
  • GAUGE

This argument is case-sensitive and must be uppercase.

metric_timestamp expression<long> Optional. The time associated with the SignalFx metric, measured in milliseconds. If a timestamp is not available, the ingest time is used as the timestamp
metric_dimensions expression<map<string, string>> Optional. Defaults to empty { }. JSON key-value pairs that describe a SignalFx metric.
parameters map<string, string> Optional. Defaults to empty { }. Key-value pairs that can be passed to SignalFX. This can be set to:
  • batch_size: The maximum number of elements to flush. The batch size can range between 50 and 10,000 elements. The default value is 2000.
  • batch_interval_msecs: The maximum time to wait before flushing. The batch size interval can range between 50 and 100,000 milliseconds. The default value is 2000.

Port requirements

The Write to SignalFx function sends HTTP requests to the SignalFx endpoint via a dynamic or ephemeral port. Your local firewall configuration must be set up to allow outgoing HTTP traffic from at least one of the ports in the range of dynamic or ephemeral ports allocated by your operating system. These ports typically range from 49152 to 65535, but this can be different depending on the specific operating system you are using.

SPL2 Pipeline Builder example:

...| into signalfx("connection_id", "my_metric_name", 0.33, "COUNTER", 1583864717233L, {"my_dimension": "1"}, {"batch_size": "1000"});

Write to Azure Event Hubs Using SAS Key (Beta)

Send data to Azure Event Hubs. See Send data from DSP to Azure Event Hubs Using SAS Key (Beta) for more information about creating an Azure Event Hubs connection. This sink function is only available if you have the DSP Universal license.

This is a connector-based function. To use it, you must first create an Azure Event Hubs connection. Use that connection_id as an argument for this function.

This is a beta function and not ready for production.

Function Name
event_hubs
Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Arguments
Parameter Type Description
connection_id string The Azure Event Hubs connection ID.
event_hub_name string The Event Hub name.
value expression<bytes> The event body or payload to send to Azure Event Hubs.
key expression<string> Optional. The Event Hub partition key. The default value is null.
parameters map<string, string> Optional. Supported values are:
  • batch_window: A number in string format. Must be between 10 and 10,000. The default value is 1000. Accumulate events for N milliseconds before sending events in batches to Event Hubs.
  • unordered: A boolean indicating that strict ordering of events is not required. The default value is false. If the order of events is not important, you will get better throughput if you set partition_key to an empty string or null, and set unordered to true.

If an event exceeds the maximum batch capacity, the job will fail and it will not be restarted.

SPL2 Pipeline Builder example:

...| into event_hubs("connection_id", "My Event Hub", value, "partition_key", {"batch_window": "5000", "unordered": "false"});
Last modified on 23 September, 2020
Source functions (Data Sources)   Overview of evaluation scalar functions

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.1.0


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters