Splunk® Data Stream Processor

Function Reference

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details. For information about upgrading to a supported version, see the Upgrade the Splunk Data Stream Processor topic.

Send data to Google Cloud Storage

Use the Send to Google Cloud Storage sink function to send data to a Google Cloud Storage bucket.

This sink function sends data to Cloud Storage in chunks so that any interrupted data transfers can resume from the last chunk that was sent successfully, instead of restarting from the beginning. Interrupted data transfers can be resumed any time within one week of when the transfer was initially started. Search for "Resumable uploads" in the Google Cloud Storage documentation for more information.

Because interrupted data transfers can't be resumed after more than one week has passed, you might encounter pipeline failures when trying to reactivate a pipeline after it has been deactivated for over a week. See the "Limitations of the Google Cloud Storage sink function" section on this page for more information.

Depending on the format setting, data is stored in one of the following formats in the destination Cloud Storage bucket:

  • When format is set to json, records are stored in .json files.
  • When format is set to string_body, the body of each record is encoded as a string, and these strings are stored in .txt files. All other fields from the record are dropped.

Prerequisites

Before you can use this function, you must do the following:

  • Create a Google Cloud Storage connection. See Create a DSP connection to send data to Google Cloud Storage in the Connect to Data Sources and Destinations with DSP manual. When configuring this sink function, set the connection_id argument to the ID of that connection.
  • Create the destination bucket in your Google Cloud Storage instance. For information about creating a bucket, search for "Creating storage buckets" in the Google Cloud Storage documentation.

Function input schema

collection<record<R>>
This function takes in collections of records with schema R.

Required arguments

connection_id
Syntax: string
Description: The Google Cloud Storage connection ID.
Example in Canvas View: my-cloud-storage-connection
bucket
Syntax: string
Description: The name of the destination bucket in Google Cloud Storage.
Example in Canvas View: my_bucket
prefix
Syntax: string
Description: The prefix to use in the names of the generated Google Cloud Storage objects.
This parameter supports prefix patterns with substitutions. Using the special syntax #{datetime:<date-time-format>}, you can specify date-time formats that will be substituted with a timestamp indicating when the Cloud Storage object was generated. See the Prefix parameter substitutions section on this page for more information.
Example in Canvas View: #{datetime:yyyy-MM-dd}
format
Syntax: string
Description: The format for data in the Google Cloud Storage destination. The format can be set to one of the following:
  • json for line-delimited JSON format.
  • string_body for string format. The sink function encodes the contents of the body field as a string and sends it to Google Cloud Storage to be stored in a .txt file. All other fields from the record are dropped.

    When format is set to string_body, the sink function uses line break characters to split the streaming data into records. If the body of an event contains line break characters to begin with, then that event is split into multiple records.

These values are case-insensitive. For example, JSON and String_Body are also accepted.
Example in Canvas View: JSON

Optional arguments

size_mb
Syntax: A long value between 1 and 5,000,000, inclusive.
Description: The maximum file size in Cloud Storage, specified in megabytes (MB). If a file reaches this maximum size while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with rollover_secs and inactivity_secs.
Default: 1000 (equivalent to 1 GB)
Example in Canvas View: 500
rollover_secs
Syntax: A long value that is greater than 0.
Description: The maximum number of seconds that a Cloud Storage file can stay open. If a file has been open for this length of time while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with size_mb and inactivity_secs.
Default: 300
Example in Canvas View: 120
inactivity_secs
Syntax: A long value that is greater than 0.
Description: The maximum number of seconds that an inactive Cloud Storage file can stay open. If a file has been open for this length of time without receiving any streamed data, then the file is closed and a new file with the same object name prefix is created. When the sink function resumes streaming data to Cloud Storage, the data is streamed to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with size_mb and rollover_secs.
Default: 60
Example in Canvas View: 90
upload_chunk_size
Syntax: An integer value that is a multiple of 262,144.
Description: The size, in bytes, of each chunk of data to send to Google Cloud Storage. If you configure this parameter, you must specify a number of bytes equal to a multiple of 262,144 (256 KiB).

In most cases, the default value is sufficient. The upload_chunk_size parameter is an advanced configuration parameter used only for performance tuning.

Default: 15,728,640 (equivalent to 15,360 KiB or 15 MiB)
Example in Canvas View: 524288
kms_key_resource_id
Syntax: string
Description: The resource ID of the customer-managed key used to encrypt data in Google Cloud Storage. Search for "Using customer-managed encryption keys" in the Google Cloud Storage documentation for more information. Encryption keys are not stored or managed by the .
Default: The default encryption key specified in the destination bucket is used. If the bucket does not have a default encryption key, then the default server-side encryption method from Cloud Storage is used. Search for "Data encryption options" in the Google Cloud Storage documentation for more information.
Example in Canvas View: projects/data-streaming/locations/us-east1/keyRings/my-streaming-data-useast1/cryptoKeys/my-streaming-data-key
recover_append
Syntax: Boolean
Description: A Boolean indicating whether, when recovering from a pipeline failure, the function uses a special recovery method that prevents data loss but causes data duplication. Set this argument to true if your data source can't reproduce prior events in a consistent order, or if data duplication is less detrimental to your use case than data loss. Otherwise, set this argument to false.
Default: false
Example in Canvas View: true

SPL2 examples

When working in the SPL View, you can write the function by providing the arguments in this exact order:

...| into google_cloud_storage("576205b3-f6f5-4ab7-8ffc-a4089a95d0c4", "my_bucket", "my_prefix", "json", 500, 120, 90, 15728640, "projects/data-streaming/locations/us-east1/keyRings/my-streaming-data-useast1/cryptoKeys/my-streaming-data-key", true); 

Alternatively, you can use named arguments to declare the arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example omits rollover_secs, inactivity_secs, and upload_chunk_size:

...| into google_cloud_storage(connection_id: "fd8f234b-6732-47d1-844c-5c08413d42ee", recover_append: true, kms_key_resource_id: "projects/data-streaming/locations/us-east1/keyRings/my-streaming-data-useast1/cryptoKeys/my-streaming-data-key", bucket: "my_bucket", format: "json", prefix: "my_prefix", size_mb: 500);

If you want to use a mix of unnamed and named arguments in your functions, you must list all unnamed arguments in the correct order before providing the named arguments.

Prefix parameter substitutions

When configuring the prefix parameter, you can use a date-time format inside a #{datetime: } pattern to generate prefixes with dynamic date and time values.

See the following table for examples of the object name prefix that is generated from a given prefix setting, assuming that the date is March 31, 2020:

Prefix parameter setting Generated object name prefix
#{datetime:yyyy-MM} 2020-03
#{datetime:yyyy-MM-dd} 2020-03-31
streaming/my-data/#{datetime:yyyy-MM-dd} streaming/my-data/2020-03-31

Any leading or trailing whitespaces inside #{datetime: } are trimmed before substitution. An empty date-time substitution pattern, such as #{datetime:}, is not allowed.

File rollover policy

When you send data to Google Cloud Storage, the data is stored in a file in the specified Cloud Storage bucket. To prevent situations where too much data gets stored in a single file, the sink function uses a rollover policy to change the file that it sends data to at regular intervals. This rollover policy determines when a Cloud Storage file is closed, which causes a new file with the same name prefix to be opened and streamed data to be sent to this new file instead of the closed one.

A Cloud Storage file must be closed before you can see it listed in your bucket.

By default, a file is closed when any of the following conditions are met:

  • The file reaches 5TB in size.
  • The file has been receiving data for 300 seconds.
  • The file has not received data for 60 seconds.

You can modify this rollover policy by configuring the size_mb, rollover_secs, and inactivity_secs arguments.

Example: Custom rollover policy

In this example, the sink function is configured as follows:

  • prefix is set to #{datetime:yyyy-MM-dd}, and the current date is May 25, 2021.
  • format is set to json.
  • size_mb is set to 500.
  • rollover_secs is set to 120.
  • inactivity_secs is set to 90.

When you activate your pipeline, the sink function sends the streaming data to a file with a name like 2021-05-25_part-3152cf42-458f-2635-be4d-1573ba7375c0-0.json. If the size of the file reaches 500 MB before 120 seconds elapse, or if 120 seconds elapse before the file reaches 500 MB in size, then the 2021-05-25_part-3152cf42-458f-2635-be4d-1573ba7375c0-0.json file is closed and a new file named 2021-05-25_part-3152cf42-458f-2635-be4d-1573ba7375c0-1.json is opened. The sink function then starts sending your data to this newly opened file instead.

If your pipeline is activated but there is no data streaming to Cloud Storage, after 90 seconds the 2021-05-25_part-3152cf42-458f-2635-be4d-1573ba7375c0-1.json file is closed. When data resumes streaming to Cloud Storage, a file named 2021-05-25_part-3152cf42-458f-2635-be4d-1573ba7375c0-2.json is opened and the data is streamed to that file.

Limitations of the Google Cloud Storage sink function

If you deactivate a pipeline before the sink function has finished sending data to Cloud Storage, and then attempt to reactivate the pipeline after more than one week has passed, the pipeline will fail to activate. This failure occurs because Cloud Storage doesn't allow interrupted data uploads to resume after more than one week has passed.

To resolve this issue, reactivate your pipeline with the Skip Restore State activation option enabled. This option allows the pipeline to be successfully reactivated, but causes any data that was ingested while the pipeline was deactivated to be dropped. See Using activation checkpoints to activate your pipeline in the Use the manual for more information.

To prevent this issue from occurring, before you deactivate a pipeline for an extended period of time, stop your data source from continuing to send data to the pipeline and wait for all the files in Cloud Storage to be closed. To confirm that the files are closed, check if the files containing your latest data are visible in your Cloud Storage bucket.

Last modified on 14 April, 2021
Send data to Amazon S3   Send data to Kafka

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters