Splunk® Data Stream Processor

Function Reference

Acrobat logo Download manual as PDF

Acrobat logo Download topic as PDF

Get data from Google Cloud Pub/Sub

Use the Google Cloud Pub/Sub source function to get messages from a Google Cloud Pub/Sub topic. You must create a connection to use this source function. See Create a connection to Google Cloud Pub/Sub. To deserialize and preview the data from this source function, see Deserialize and preview data from Google Cloud Pub/Sub.

This function gets Pub/Sub data by subscribing to the topic specified by the topic_id parameter and receiving any messages that are delivered to the topic after the subscription was created. The function cannot receive any messages that were delivered before the subscription was created. See the "Subscription management" section for more information.

The function provides at-least-once data delivery with a best-effort order guarantee. Deactivating and reactivating a pipeline can cause data duplication. Any messages that are not acknowledged as received are delivered again, and may arrive out-of-order as a result.

Function output schema

This function outputs records with the schema described in the following table.

Depending on whether the Pub/Sub message is parsed as a Google Cloud log message, the pubsub function maps different data values to the timestamp, nanos, source, sourceType, and host fields. See the "Google Cloud log data extraction" section for more information.

Key Description
body The data field from the message in bytes.
attributes The attributes field from the message as a map of strings.
messageId The messageId field from the message as a string.
publishTime The publishTime field from the message as a string.
timestamp The publishTime of the message or the timestamp of the Google Cloud log, given in epoch time format in milliseconds and stored as a long.
nanos The nanoseconds part of the timestamp as an integer.
source The name of the Pub/Sub topic or the logName field in the Google Cloud log, stored as a string.
sourceType The source type as a string.
host The ID of the project that the Pub/Sub topic belongs to or the project ID stated in the logName field in the Google Cloud log, stored as a string.

The following is an example of a typical record from the pubsub function:

{
"body": "aGVsbG8gd29ybGQ=",
"attributes": {
     "key1":"value1"
     },
"messageId": 2823738566644596,
"publishTime": "2020-06-16T17:04:52.797Z",
"timestamp": 1592327092797,
"nanos": 797000000,
"source": "my-topic",
"sourceType": "google:gcp:pubsub:message",
"host": "test-server"
}

Required arguments

connection_id
Syntax: string
Description: The ID of your Google Cloud Pub/Sub connection.
Example: "576205b3-f6f5-4ab7-8ffc-a4089a95d0c4"
project_id
Syntax: string
Description: The ID of the Google Cloud Platform project containing the Pub/Sub topic to get messages from.
Example: "my-project-id"
topic_id
Syntax: string
Description: The ID of the Pub/Sub topic to get messages from.
Example: "my-topic-id"

Optional arguments

parallelism
Syntax:
Description: The number of concurrent threads used to pull messages from the Pub/Sub topic. Increasing the number of threads can increase the rate at which the function ingests data into the DSP pipeline, but doing so consumes more computing resources. The number of threads can range from 1 to 128, inclusive. Defaults to 1.
Example: 1
parameters
Description: Key-value pairs that specify how the source function ingests data. The following keys are supported:
extract_gcp_message_data
Syntax: TRUE | FALSE
Description: A Boolean indicating whether the function extracts Google Cloud log data and maps it to certain top-level fields in the DSP record. See the "Google Cloud log data extraction" section for more information. When this key is set to TRUE, the function uses log data for those fields. When this key is set to FALSE, or if the Pub/Sub message does not contain a log from a Google Cloud service, the function uses message and topic data instead. Defaults to TRUE.
Example: TRUE

Log data extraction involves additional processing and overhead. You can improve the performance of this function by setting extract_gcp_message_datato FALSE.

max_messages_per_pull
Syntax: integer
Description: The maximum number of messages that can be retrieved per pull. Increasing this number can increase throughput when the messages are small. This maximum number of messages can range from 1 to 10000, inclusive. Defaults to 1000.
Example: 1000

SPL2 example

You can write the function using arguments in this exact order.

| from pubsub("my-connection-id", "my-project-id", "my-topic-id", 5, {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}) | ...;

Alternatively, you can use named arguments in any order, and omit the optional argument if you just want to use the default value. See the "Named arguments" section in SPL2 syntax for more details. The following example uses named arguments to list the optional arguments before the required arguments.

| from pubsub(parallelism: 5, parameters: {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}, connection_id: "my-connection-id", project_id: "my-project-id", topic_id: "my-topic-id") | ...;

Subscription management

When you use the pubsub function in a pipeline, the function creates, reuses, or deletes subscriptions as needed:

  • When you preview the data in an inactive pipeline, the function creates a subscription to the topic. When you stop the preview, the function deletes the subscription.
  • When you activate a pipeline for the first time, the function creates a subscription to the topic. However, when you deactivate the pipeline, the function does not delete the subscription. The subscription is reused whenever you reactivate that same pipeline without changing the specified topic. If you change the topic_id setting before reactivating the pipeline, then the function creates a new subscription to reach the newly-specified topic.
  • Any subscription that you haven't used for 8 days is automatically deleted.

Google Cloud log data extraction

Google Cloud Pub/Sub messages can include Google Cloud logs. For example, audit logs generated by various Google Cloud services can be delivered through Pub/Sub messages. Depending on whether the pubsub function parses the Pub/Sub message as a log message or as a non-log message, the function maps different message data to the DSP record.

The function parses a Pub/Sub message as a Google Cloud log message if both of the following conditions are met:

  • The extract_gcp_message_data parameter is set to true (which is the default value).
  • The attributes field in the message contains the logging.googleapis.com/timestamp key.

Otherwise, the function parses the Pub/Sub message as a non-log message instead.

The following table describes how different data is mapped to certain fields in the DSP record depending on whether the Pub/Sub message is parsed as a Google Cloud log message.

DSP record field Data from Google Cloud log messages Data from non-log messages
timestamp The time when the log was written. The time when the message was published.
nanos The nanoseconds part of the timestamp indicating when the log was written. The nanoseconds part of the timestamp indicating when the message was published.
source The name of the log. The name of the topic that the message comes from.
sourceType google:gcp:<gcp-service>:<log-type>:<original-service>, where:
  • <gcp-service> is the service that the message originates from. Possible values are audit, dataflow, compute, and cloudfunctions.
  • <log-type> is the type of log. When <gcp-service> is audit, the possible values are system_event, data_access, and activity.
  • <original-service> is the name of the service being audited. This value is only included when <gcp-service> is audit.
google:gcp:pubsub:message
host The project ID associated with the log. The project ID associated with the topic that the message comes from.
Last modified on 06 November, 2020
PREVIOUS
Get data from Google Cloud Monitoring
  NEXT
Get data from Microsoft 365

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0


Was this documentation topic helpful?

You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters