Get data from Google Cloud Pub/Sub
Use the Google Cloud Pub/Sub source function to get messages from a Google Cloud Pub/Sub topic.
This function gets Pub/Sub data by subscribing to the topic specified by the topic_id
parameter and receiving any messages that are delivered to the topic after the subscription was created. The function cannot receive any messages that were delivered before the subscription was created. See the Subscription management section on this page for more information.
The function provides at-least-once data delivery with a best-effort order guarantee. Deactivating and reactivating a pipeline can cause data duplication. Any messages that are not acknowledged as received are delivered again, and may arrive out-of-order as a result.
The payload of the ingested data is encoded as bytes. To deserialize and preview your data, see Deserialize and preview data from Google Cloud Pub/Sub in the Connect to Data Sources and Destinations with the manual.
Prerequisites
Before you can use this function, you must create a connection. See Create a connection to Google Cloud Pub/Sub in the Connect to Data Sources and Destinations with the manual. When configuring this source function, set the connection_id
argument to the ID of that connection.
Function output schema
This function outputs records with the schema described in the following table.
Depending on whether the Pub/Sub message is parsed as a Google Cloud log message, the pubsub
function maps different data values to the timestamp
, nanos
, source
, sourceType
, and host
fields. See the Google Cloud log data extraction section on this page for more information.
Key | Description |
---|---|
body | The data field from the message in bytes.
|
attributes | The attributes field from the message as a map of strings.
|
messageId | The messageId field from the message as a string.
|
publishTime | The publishTime field from the message as a string.
|
timestamp | The publishTime of the message or the timestamp of the Google Cloud log, given in epoch time format in milliseconds and stored as a long.
|
nanos | The nanoseconds part of the timestamp as an integer. |
source | The name of the Pub/Sub topic or the logName field in the Google Cloud log, stored as a string.
|
sourceType | The source type as a string. |
host | The ID of the project that the Pub/Sub topic belongs to or the project ID stated in the logName field in the Google Cloud log, stored as a string.
|
The following is an example of a typical record from the pubsub
function:
{ "body": "aGVsbG8gd29ybGQ=", "attributes": { "key1":"value1" }, "messageId": 2823738566644596, "publishTime": "2020-06-16T17:04:52.797Z", "timestamp": 1592327092797, "nanos": 797000000, "source": "my-topic", "sourceType": "google:gcp:pubsub:message", "host": "test-server" }
Required arguments
- connection_id
- Syntax: string
- Description: The ID of your Google Cloud Pub/Sub connection.
- Example in Canvas View: my-gcp-pubsub-connection
- project_id
- Syntax: string
- Description: The ID of the Google Cloud Platform project containing the Pub/Sub topic to get messages from.
- Example in Canvas View: my-project-id
- topic_id
- Syntax: string
- Description: The ID of the Pub/Sub topic to get messages from.
- Example in Canvas View: my-topic-id
Optional arguments
- parallelism
- Syntax: integer
- Description: The number of concurrent threads used to pull messages from the Pub/Sub topic. Increasing the number of threads can increase the rate at which the function ingests data into the DSP pipeline, but doing so consumes more computing resources. The number of threads can range from 1 to 128, inclusive. Defaults to 1.
- Example in Canvas View: 1
- parameters
- Description: Key-value pairs that specify how the source function ingests data.
- When working in Canvas View, specify the name and value of the property in the fields on either side of the equal sign ( = ), and click Add to specify additional properties.
- When working in SPL View, specify each property using the format
"<name>": "<value>"
, and separate each property with a comma ( , ). Make sure to enclose the entire argument in braces ( { } ).
- See the following table for a description of each supported parameter.
Parameter Syntax Description Example in Canvas View extract_gcp_message_data Boolean A Boolean indicating whether the function extracts Google Cloud log data and maps it to certain top-level fields in the DSP record. See the Google Cloud log data extraction section on this page for more information. When this key is set to true
, the function uses log data for those fields. When this key is set tofalse
, or if the Pub/Sub message does not contain a log from a Google Cloud service, the function uses message and topic data instead. Defaults totrue
.Log data extraction involves additional processing and overhead. You can improve the performance of this function by setting
extract_gcp_message_data
tofalse
.extract_gcp_message_data = false max_messages_per_pull integer between 1 to 10,000, inclusive. The maximum number of messages that can be retrieved per pull. Increasing this number can increase throughput when the messages are small. Defaults to 1000. max_messages_per_pull = 2000
SPL2 example
When working in the SPL View, you can write the function using arguments in this exact order:
| from pubsub("my-connection-id", "my-project-id", "my-topic-id", 5, {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}) | ...;
Alternatively, you can use named arguments in any order, and omit the optional argument if you just want to use the default value. The following example uses named arguments to list the optional arguments before the required arguments:
| from pubsub(parallelism: 5, parameters: {"extract_gcp_message_data": "false", "max_messages_per_pull": "2000"}, connection_id: "my-connection-id", project_id: "my-project-id", topic_id: "my-topic-id") | ...;
If you want to use a mix of unnamed and named arguments in your functions, you must list all unnamed arguments in the correct order before providing the named arguments.
Subscription management
When you use the pubsub
function in a pipeline, the function creates, reuses, or deletes subscriptions as needed:
- When you preview the data in an inactive pipeline, the function creates a subscription to the topic. When you stop the preview, the function deletes the subscription.
- When you activate a pipeline for the first time, the function creates a subscription to the topic. However, when you deactivate the pipeline, the function does not delete the subscription. The subscription is reused whenever you reactivate that same pipeline without changing the specified topic. If you change the
topic_id
setting before reactivating the pipeline, then the function creates a new subscription to reach the newly-specified topic. - Any subscription that you haven't used for 8 days is automatically deleted.
Google Cloud log data extraction
Google Cloud Pub/Sub messages can include Google Cloud logs. For example, audit logs generated by various Google Cloud services can be delivered through Pub/Sub messages. Depending on whether the pubsub
function parses the Pub/Sub message as a log message or as a non-log message, the function maps different message data to the DSP record.
The function parses a Pub/Sub message as a Google Cloud log message if both of the following conditions are met:
- The
extract_gcp_message_data
parameter is set totrue
(which is the default value). - The
attributes
field in the message contains thelogging.googleapis.com/timestamp
key.
Otherwise, the function parses the Pub/Sub message as a non-log message instead.
The following table describes how different data is mapped to certain fields in the DSP record depending on whether the Pub/Sub message is parsed as a Google Cloud log message.
DSP record field | Data from Google Cloud log messages | Data from non-log messages |
---|---|---|
timestamp | The time when the log was written. | The time when the message was published. |
nanos | The nanoseconds part of the timestamp indicating when the log was written. | The nanoseconds part of the timestamp indicating when the message was published. |
source | The name of the log. | The name of the topic that the message comes from. |
sourceType | google:gcp:<gcp-service>:<log-type>:<original-service> , where:
|
google:gcp:pubsub:message
|
host | The project ID associated with the log. | The project ID associated with the topic that the message comes from. |
Get data from Google Cloud Monitoring | Get data from Microsoft 365 |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6
Feedback submitted, thanks!