Splunk® Data Stream Processor

Function Reference

Acrobat logo Download manual as PDF


On April 3, 2023, Splunk Data Stream Processor reached its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.

All DSP releases prior to DSP 1.4.0 use Gravity, a Kubernetes orchestrator, which has been announced end-of-life. We have replaced Gravity with an alternative component in DSP 1.4.0. Therefore, we will no longer provide support for versions of DSP prior to DSP 1.4.0 after July 1, 2023. We advise all of our customers to upgrade to DSP 1.4.0 in order to continue to receive full product support from Splunk.
Acrobat logo Download topic as PDF

Get data from Google Cloud Pub/Sub

Use the Google Cloud Pub/Sub source function to get messages from a Google Cloud Pub/Sub topic.

This function gets Pub/Sub data by subscribing to the topic specified by the topic_id parameter and receiving any messages that are delivered to the topic after the subscription was created. The function cannot receive any messages that were delivered before the subscription was created. See the Subscription management section on this page for more information.

The function provides at-least-once data delivery with a best-effort order guarantee. Deactivating and reactivating a pipeline can cause data duplication. Any messages that are not acknowledged as received are delivered again, and may arrive out-of-order as a result.

The payload of the ingested data is encoded as bytes. To deserialize and preview your data, see Deserialize and preview data from Google Cloud Pub/Sub in the Connect to Data Sources and Destinations with the manual.

Prerequisites

Before you can use this function, you must create a connection. See Create a connection to Google Cloud Pub/Sub in the Connect to Data Sources and Destinations with the manual. When configuring this source function, set the connection_id argument to the ID of that connection.

Function output schema

This function outputs records with the schema described in the following table.

Depending on whether the Pub/Sub message is parsed as a Google Cloud log message, the pubsub function maps different data values to the timestamp, nanos, source, sourceType, and host fields. See the Google Cloud log data extraction section on this page for more information.

Key Description
body The data field from the message in bytes.
attributes The attributes field from the message as a map of strings.
messageId The messageId field from the message as a string.
publishTime The publishTime field from the message as a string.
timestamp The publishTime of the message or the timestamp of the Google Cloud log, given in epoch time format in milliseconds and stored as a long.
nanos The nanoseconds part of the timestamp as an integer.
source The name of the Pub/Sub topic or the logName field in the Google Cloud log, stored as a string.
sourceType The source type as a string.
host The ID of the project that the Pub/Sub topic belongs to or the project ID stated in the logName field in the Google Cloud log, stored as a string.

The following is an example of a typical record from the pubsub function:

{
"body": "aGVsbG8gd29ybGQ=",
"attributes": {
     "key1":"value1"
     },
"messageId": 2823738566644596,
"publishTime": "2020-06-16T17:04:52.797Z",
"timestamp": 1592327092797,
"nanos": 797000000,
"source": "my-topic",
"sourceType": "google:gcp:pubsub:message",
"host": "test-server"
}

Required arguments

connection_id
Syntax: string
Description: The ID of your Google Cloud Pub/Sub connection.
Example in Canvas View: my-gcp-pubsub-connection
project_id
Syntax: string
Description: The ID of the Google Cloud Platform project containing the Pub/Sub topic to get messages from.
Example in Canvas View: my-project-id
topic_id
Syntax: string
Description: The ID of the Pub/Sub topic to get messages from.
Example in Canvas View: my-topic-id

Optional arguments

parallelism
Syntax: integer
Description: The number of concurrent threads used to pull messages from the Pub/Sub topic. Increasing the number of threads can increase the rate at which the function ingests data into the DSP pipeline, but doing so consumes more computing resources. The number of threads can range from 1 to 128, inclusive. Defaults to 1.
Example in Canvas View: 1
parameters
Description: Key-value pairs that specify how the source function ingests data.
  • When working in Canvas View, specify the name and value of the property in the fields on either side of the equal sign ( = ), and click Add to specify additional properties.
  • When working in SPL View, specify each property using the format "<name>": "<value>", and separate each property with a comma ( , ). Make sure to enclose the entire argument in braces ( { } ).
See the following table for a description of each supported parameter.
Parameter Syntax Description Example in Canvas View
extract_gcp_message_data Boolean A Boolean indicating whether the function extracts Google Cloud log data and maps it to certain top-level fields in the DSP record. See the Google Cloud log data extraction section on this page for more information. When this key is set to true, the function uses log data for those fields. When this key is set to false, or if the Pub/Sub message does not contain a log from a Google Cloud service, the function uses message and topic data instead. Defaults to true.

Log data extraction involves additional processing and overhead. You can improve the performance of this function by setting extract_gcp_message_data to false.

extract_gcp_message_data = false
max_messages_per_pull integer between 1 to 10,000, inclusive. The maximum number of messages that can be retrieved per pull. Increasing this number can increase throughput when the messages are small. Defaults to 1000. max_messages_per_pull = 2000

SPL2 example

When working in the SPL View, you can write the function using arguments in this exact order:

| from pubsub("my-connection-id", "my-project-id", "my-topic-id", 5, {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}) | ...;

Alternatively, you can use named arguments in any order, and omit the optional argument if you just want to use the default value. The following example uses named arguments to list the optional arguments before the required arguments:

| from pubsub(parallelism: 5, parameters: {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}, connection_id: "my-connection-id", project_id: "my-project-id", topic_id: "my-topic-id") | ...;

If you want to use a mix of unnamed and named arguments in your functions, you must list all unnamed arguments in the correct order before providing the named arguments.

Subscription management

When you use the pubsub function in a pipeline, the function creates, reuses, or deletes subscriptions as needed:

  • When you preview the data in an inactive pipeline, the function creates a subscription to the topic. When you stop the preview, the function deletes the subscription.
  • When you activate a pipeline for the first time, the function creates a subscription to the topic. However, when you deactivate the pipeline, the function does not delete the subscription. The subscription is reused whenever you reactivate that same pipeline without changing the specified topic. If you change the topic_id setting before reactivating the pipeline, then the function creates a new subscription to reach the newly-specified topic.
  • Any subscription that you haven't used for 8 days is automatically deleted.

Google Cloud log data extraction

Google Cloud Pub/Sub messages can include Google Cloud logs. For example, audit logs generated by various Google Cloud services can be delivered through Pub/Sub messages. Depending on whether the pubsub function parses the Pub/Sub message as a log message or as a non-log message, the function maps different message data to the DSP record.

The function parses a Pub/Sub message as a Google Cloud log message if both of the following conditions are met:

  • The extract_gcp_message_data parameter is set to true (which is the default value).
  • The attributes field in the message contains the logging.googleapis.com/timestamp key.

Otherwise, the function parses the Pub/Sub message as a non-log message instead.

The following table describes how different data is mapped to certain fields in the DSP record depending on whether the Pub/Sub message is parsed as a Google Cloud log message.

DSP record field Data from Google Cloud log messages Data from non-log messages
timestamp The time when the log was written. The time when the message was published.
nanos The nanoseconds part of the timestamp indicating when the log was written. The nanoseconds part of the timestamp indicating when the message was published.
source The name of the log. The name of the topic that the message comes from.
sourceType google:gcp:<gcp-service>:<log-type>:<original-service>, where:
  • <gcp-service> is the service that the message originates from. Possible values are audit, dataflow, compute, and cloudfunctions.
  • <log-type> is the type of log. When <gcp-service> is audit, the possible values are system_event, data_access, and activity.
  • <original-service> is the name of the service being audited. This value is only included when <gcp-service> is audit.
google:gcp:pubsub:message
host The project ID associated with the log. The project ID associated with the topic that the message comes from.
Last modified on 19 April, 2021
PREVIOUS
Get data from Google Cloud Monitoring
  NEXT
Get data from Microsoft 365

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3


Was this documentation topic helpful?


You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters