Source functions (Data Sources)
The following are available source functions for your pipeline.
Amazon CloudWatch Metrics
Get data from Amazon CloudWatch Metrics. You must create a connection to use this source function. See Use the Amazon CloudWatch Metrics connector with Splunk DSP.
API function name: read_from_aws_cloudwatch_metrics
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:
connection-id
: The name of your Amazon CloudWatch Metrics connection.
DSL example:
read_from_aws_cloudwatch_metrics("4e1a64d8-0849-4324-9298-1e655ea4ba87");
Amazon S3 Connector
Get data from Amazon S3. You must create a connection to use this source function. See Use the Amazon S3 Connector.
API function name: read_from_aws_s3
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:
connection-id
: The name of your Amazon S3 connection.
DSL example:
read_from_aws_s3("4e1a64d8-0849-4324-9298-1e655ea4ba87");
Azure Event Hubs Using SAS key
Get data from Azure Event Hubs Using SAS key. You must create a connection to use this source function. See Create a connection for the DSP Azure Event Hubs Connector with an SAS key. For information on how to create a pipeline using Azure Event Hubs as your source, see Create an Azure Event Hubs pipeline.
API function name: read_event_hubs
Function Output:
This function outputs records with the following schema:
{ partitionKey: <partitionKey> as a string. body: <body> in bytes. partitionId: <partitionId> as a string. offset: <offset> as a string. sequenceNumber: <sequenceNumber> as a long. enqueuedTime: <enqueuedTime> as a long. properties: <properties> as a map<string, string> }
Arguments:
- connection-id: The name of your Azure Event Hubs connection.
- Event Hub Name: The Event Hub name.
- Consumer Group Name: The Consumer Group name.
- (Optional) Starting Position:
- LATEST: Starts reading data from the latest position on the data stream.
- EARLIEST: Starts reading data from the very beginning of the data stream.
DSL example:
read_event_hubs("4e1a64d8-0849-4324-9298-1e655ea4ba87", "my-event-hub-name", "my-consumer-group", "earliest");
Read from Splunk Firehose
Reads data sent from Splunk DSP Ingest, Forwarders, and Collect API Services. Events from this function have the data pipeline event schema or metrics schema.
API function name: read_splunk_firehose
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments (Optional. Defaults to LATEST if not specified. You can only update this argument using the Streams API.):
TRIM_HORIZON
: Starts reading data from the very beginning of the data stream.LATEST
: Starts reading data from the latest position on the data stream.
DSL example:
read_splunk_firehose("TRIM_HORIZON");
Read from Ingest REST API
Get data from the Ingest REST API. This is a source function that filters your data to only ingest data from the Ingest REST API Service.
API function name: receive_from_ingest_rest_api
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:
- connection:
rest_api:all
DSL example:
receive_from_ingest_rest_api("rest_api:all");
If you are creating a pipeline using the Data Stream Processor REST API, you need to pass in the connection name (rest_api:all) in the streams JSON. See the Create a Pipeline in the Quickstart using the API for an example.
Read from Forwarders Service
Get data from the Splunk Forwarders Service. This is a source function that filters your data to only ingest data from the Splunk Forwarders Service. See also Create a Splunk Universal Forwarder pipeline.
API function name: receive_from_forwarders
Function Output:
This function outputs data pipeline events in the schema shown here for events or here for metrics.
Arguments:
- connection:
forwarders:all
DSL example:
receive_from_forwarders("forwarders:all");
If you are creating a pipeline using the Data Stream Processor REST API, you need to pass in the connection name (forwarders:all) in the streams JSON. See the Create a Pipeline in the Quickstart using the API for an example.
Read from Apache Kafka
Get data from an Apache or Confluent Kafka topic using a Kafka connection. See Create a Kafka pipeline.
Available connectors for this function are:
API function name: read_kafka
Function Output:
This function outputs records with the following schema:
{ key: <key> in bytes. value: <value> in bytes. topic: <topic> as a string. partition: <integer> as an integer. offset: <long> as a long. }
Arguments:
- connection-id: string
- topic: string
DSL example:
read_kafka("461b1915-131e-4daf-a144-0630307436d0", "my-topic");
Read from Apache Kafka
Get data from a Kafka topic without using authentication.
API function name: unauthenticated_read_kafka
Function Output:
This function outputs records with the following schema. If you want to send your data to a Splunk Enterprise index, see Send data from Kafka.
{ key: <key> in bytes. value: <value> in bytes. topic: <topic> as a string. partition: <integer> as an integer. offset: <long> as a long. }
Arguments:
- brokers: string (A comma-separated list of Kafka brokers from which you want to read)
- topic: string
- consumer properties: map <string, string>
DSL example:
kafka-brokers = "kafka:9092"; input-topic = "happy-path-extract-input-pipeline"; output-topic = "happy-path-extract-output-pipeline"; events = deserialize_events(unauthenticated_read_kafka(kafka-brokers, input-topic, {})); // "body" in the Avro Event is a bytes field - use to-string to convert from bytes to UTF-8 string parsed-events = projection( events, as(extract_regex(to_string(get("body")), /\d{6}/), "numbers")); write_log(parsed-events, "logger-name", "warn");
Read from Apache Kafka with SSL
This function is deprecated. Use the Read from Apache Kafka function instead. Get data from a Kafka topic using SSL. A DSP admin must configure SSL, see Kafka encryption and authentication using SSL.
API function name: read_ssl_kafka
Function Output:
This function outputs records with the following schema. If you want to send your data to a Splunk Enterprise index, see Send data from Kafka.
{ key: <key> in bytes. value: <value> in bytes. topic: <topic> as a string. partition: <integer> as an integer. offset: <long> as a long. }
Arguments:
- brokers: string (A comma-separated list of Kafka brokers from which you want to read)
- topic: string
- consumer properties: map <string, string>
DSL example:
kafka-brokers = "kafka:9093"; input-topic = "topic"; // Read from Kafka, which includes bytes fields "key" and "value" events = read_ssl_kafka(kafka-brokers, input-topic, {}); deserialized-events = eval(events, as(deserialize_json_object(get("value")), "json")); write_null(deserialized-events);
Read from Amazon Kinesis Stream
Get data from AWS Kinesis using static credentials. You must create a connection to use this source function. See Kinesis Static Connector. To deserialize and preview your data, see Deserialize and preview data from Kinesis.
API function name: read_kinesis
Function Output:
This function outputs records with the following schema:
{ key: <key> as a string value: <value> in bytes stream: <stream> as a string shard: <shard> as a string sequence: <sequence> as a string approxArrivalTimestamp: <approxArrivalTimestamp> as a long accountId: <accountId> as a string region: <region> as a string }
Arguments:
- connection-id:
connection-id
- stream-name: string
DSL example:
read_kinesis("connection-id", "my-stream-name");
Read from Amazon Kinesis (Specify Init Stream Position)
Get data from AWS Kinesis using static credentials. You must create a connection to use this source function. See Kinesis Static Connector. To deserialize and preview your data, see Deserialize and preview data from Kinesis.
API function name: read_kinesis
Function Output:
This function outputs records with the following schema:
{ key: <key> as a string value: <value> in bytes stream: <stream> as a string shard: <shard> as a string sequence: <sequence> as a string approxArrivalTimestamp: <approxArrivalTimestamp> as a long accountId: <accountId> as a string region: <region> as a string }
Arguments:
- connection-id:
connection-id
- stream-name: string
- initial-position:
TRIM HORIZON
orLATEST
DSL example:
read_kinesis("connection-id", "my-stream-name", "TRIM_HORIZON");
Union | Sink functions (Data Destinations) |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.0
Feedback submitted, thanks!