Splunk® Data Stream Processor

Function Reference

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Source functions (Data Sources)

The following are available source functions for your pipeline.

Amazon CloudWatch Metrics

Get data from Amazon CloudWatch Metrics. You must create a connection to use this source function. See Use the Amazon CloudWatch Metrics connector with Splunk DSP.

API function name: read_from_aws_cloudwatch_metrics
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:

  • connection-id: The name of your Amazon CloudWatch Metrics connection.

DSL example:

read_from_aws_cloudwatch_metrics("4e1a64d8-0849-4324-9298-1e655ea4ba87");

Amazon S3 Connector

Get data from Amazon S3. You must create a connection to use this source function. See Use the Amazon S3 Connector.

API function name: read_from_aws_s3
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:

  • connection-id: The name of your Amazon S3 connection.

DSL example:

read_from_aws_s3("4e1a64d8-0849-4324-9298-1e655ea4ba87");

Azure Event Hubs Using SAS key

Get data from Azure Event Hubs Using SAS key. You must create a connection to use this source function. See Create a connection for the DSP Azure Event Hubs Connector with an SAS key. For information on how to create a pipeline using Azure Event Hubs as your source, see Create an Azure Event Hubs pipeline.

API function name: read_event_hubs
Function Output:
This function outputs records with the following schema:

{
partitionKey: <partitionKey> as a string. 
body: <body> in bytes. 
partitionId: <partitionId> as a string. 
offset: <offset> as a string.
sequenceNumber: <sequenceNumber> as a long. 
enqueuedTime: <enqueuedTime> as a long.
properties: <properties> as a map<string, string> 
}

Arguments:

  • connection-id: The name of your Azure Event Hubs connection.
  • Event Hub Name: The Event Hub name.
  • Consumer Group Name: The Consumer Group name.
  • (Optional) Starting Position:
    • LATEST: Starts reading data from the latest position on the data stream.
    • EARLIEST: Starts reading data from the very beginning of the data stream.

DSL example:

read_event_hubs("4e1a64d8-0849-4324-9298-1e655ea4ba87", "my-event-hub-name", "my-consumer-group", "earliest");

Read from Splunk Firehose

Reads data sent from Splunk DSP Ingest, Forwarders, and Collect API Services. Events from this function have the data pipeline event schema or metrics schema.

API function name: read_splunk_firehose
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments (Optional. Defaults to LATEST if not specified. You can only update this argument using the Streams API.):

  • TRIM_HORIZON: Starts reading data from the very beginning of the data stream.
  • LATEST: Starts reading data from the latest position on the data stream.

DSL example:

read_splunk_firehose("TRIM_HORIZON");

Read from Ingest REST API

Get data from the Ingest REST API. This is a source function that filters your data to only ingest data from the Ingest REST API Service.

API function name: receive_from_ingest_rest_api
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.

Arguments:

  • connection: rest_api:all

DSL example:

receive_from_ingest_rest_api("rest_api:all");

If you are creating a pipeline using the Data Stream Processor REST API, you need to pass in the connection name (rest_api:all) in the streams JSON. See the Create a Pipeline in the Quickstart using the API for an example.

Read from Forwarders Service

Get data from the Splunk Forwarders Service. This is a source function that filters your data to only ingest data from the Splunk Forwarders Service. See also Create a Splunk Universal Forwarder pipeline.

API function name: receive_from_forwarders
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:

  • connection: forwarders:all

DSL example:

receive_from_forwarders("forwarders:all");

If you are creating a pipeline using the Data Stream Processor REST API, you need to pass in the connection name (forwarders:all) in the streams JSON. See the Create a Pipeline in the Quickstart using the API for an example.

Read from Apache Kafka

Get data from an Apache or Confluent Kafka topic using a Kafka connection. See Create a Kafka pipeline.

Available connectors for this function are:

API function name: read_kafka
Function Output:
This function outputs records with the following schema:

{
key: <key> in bytes. 
value: <value> in bytes. 
topic: <topic> as a string. 
partition: <integer> as an integer.
offset: <long> as a long. 
}

Arguments:

  • connection-id: string
  • topic: string

DSL example:

read_kafka("461b1915-131e-4daf-a144-0630307436d0", "my-topic");

Read from Apache Kafka

Get data from a Kafka topic without using authentication.

API function name: unauthenticated_read_kafka
Function Output:
This function outputs records with the following schema. If you want to send your data to a Splunk Enterprise index, see Send data from Kafka.

{
key: <key> in bytes. 
value: <value> in bytes. 
topic: <topic> as a string. 
partition: <integer> as an integer.
offset: <long> as a long. 
}

Arguments:

  • brokers: string (A comma-separated list of Kafka brokers from which you want to read)
  • topic: string
  • consumer properties: map <string, string>

DSL example:

kafka-brokers = "kafka:9092";
input-topic = "happy-path-extract-input-pipeline";
output-topic = "happy-path-extract-output-pipeline";
		
events = deserialize_events(unauthenticated_read_kafka(kafka-brokers, input-topic, {}));
// "body" in the Avro Event is a bytes field - use to-string to convert from bytes to UTF-8 string
parsed-events = projection(
events,
as(extract_regex(to_string(get("body")), /\d{6}/), "numbers"));
write_log(parsed-events, "logger-name", "warn");		

 

Read from Apache Kafka with SSL

This function is deprecated. Use the Read from Apache Kafka function instead. Get data from a Kafka topic using SSL. A DSP admin must configure SSL, see Kafka encryption and authentication using SSL.

API function name: read_ssl_kafka
Function Output:
This function outputs records with the following schema. If you want to send your data to a Splunk Enterprise index, see Send data from Kafka.

{
key: <key> in bytes. 
value: <value> in bytes. 
topic: <topic> as a string. 
partition: <integer> as an integer.
offset: <long> as a long. 
}

Arguments:

  • brokers: string (A comma-separated list of Kafka brokers from which you want to read)
  • topic: string
  • consumer properties: map <string, string>

DSL example:

kafka-brokers = "kafka:9093";
input-topic = "topic";

// Read from Kafka, which includes bytes fields "key" and "value"
events = read_ssl_kafka(kafka-brokers, input-topic, {});

deserialized-events = eval(events, as(deserialize_json_object(get("value")), "json"));
write_null(deserialized-events); 

Read from Amazon Kinesis Stream

Get data from AWS Kinesis using static credentials. You must create a connection to use this source function. See Kinesis Static Connector. To deserialize and preview your data, see Deserialize and preview data from Kinesis.

API function name: read_kinesis
Function Output:
This function outputs records with the following schema:

{
key: <key> as a string
value: <value> in bytes
stream: <stream> as a string 
shard: <shard> as a string
sequence: <sequence> as a string
approxArrivalTimestamp: <approxArrivalTimestamp> as a long
accountId: <accountId> as a string
region: <region> as a string
}

Arguments:

  • connection-id: connection-id
  • stream-name: string

DSL example:

read_kinesis("connection-id", "my-stream-name");

Read from Amazon Kinesis (Specify Init Stream Position)

Get data from AWS Kinesis using static credentials. You must create a connection to use this source function. See Kinesis Static Connector. To deserialize and preview your data, see Deserialize and preview data from Kinesis.


API function name: read_kinesis
Function Output:
This function outputs records with the following schema:

{
key: <key> as a string
value: <value> in bytes
stream: <stream> as a string 
shard: <shard> as a string
sequence: <sequence> as a string
approxArrivalTimestamp: <approxArrivalTimestamp> as a long
accountId: <accountId> as a string
region: <region> as a string
}

Arguments:

  • connection-id: connection-id
  • stream-name: string
  • initial-position: TRIM HORIZON or LATEST

DSL example:

read_kinesis("connection-id", "my-stream-name", "TRIM_HORIZON");
Last modified on 14 August, 2020
Union   Sink functions (Data Destinations)

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters