All DSP releases prior to DSP 1.4.0 use Gravity, a Kubernetes orchestrator, which has been announced end-of-life. We have replaced Gravity with an alternative component in DSP 1.4.0. Therefore, we will no longer provide support for versions of DSP prior to DSP 1.4.0 after July 1, 2023. We advise all of our customers to upgrade to DSP 1.4.0 in order to continue to receive full product support from Splunk.
Send data to Amazon S3
Use the Send to Amazon S3 sink function to send data to an Amazon S3 bucket.
Depending on the format
and compression
settings, data is stored in one of the following formats in the destination S3 bucket:
- When
format
is set tojson
andcompression
is not set, records are stored in a .json file. - When
format
is set tojson
andcompression
is set togzip
, records are stored in a .json.gz file. - When
format
is set toparquet
, records are stored in a .parquet file regardless of thecompression
setting.
You will not see data in Amazon S3 until the file containing that data is closed. See File rollover policy for more information.
Prerequisites
Before you can use this function, you must do the following:
- Create an Amazon S3 connection. See Create a DSP connection to send data to Amazon S3 in the Connect to Data Sources and Destinations with the manual. When configuring this sink function, set the
connection_id
argument to the ID of that connection. - Create the destination bucket in your Amazon S3 instance. Don't include any periods ( . ) in the bucket name. For information about creating a bucket, search for "How do I create an S3 Bucket?" in the Amazon Simple Storage Service Console User Guide.
If you activate your pipeline before creating the bucket specified in the
bucket
argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely.
Function input schema
- collection<record<R>>
- This function takes in collections of records with schema R.
Required arguments
- connection_id
- Syntax: string
- Description: The Amazon S3 connection ID.
- Example in Canvas View: my-amazon-s3-connection
- bucket
- Syntax: string
- Description: The name of the destination bucket in Amazon S3.
- Example in Canvas View: my_bucket
Make sure that the destination bucket exists in your Amazon S3 instance, and that the bucket name doesn't include any periods ( . ). If you activate your pipeline before creating the bucket specified in the
bucket
argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely. - prefix
- Syntax: string
- Description: The prefix to use in the object keys of the generated Amazon S3 objects.
- This parameter supports prefix patterns with substitutions. Using the special syntax
#{ }
, you can specify field names that will be substituted with the actual field value, or specify date-time formats that will be substituted with a timestamp indicating when the S3 object was generated. See the Prefix parameter substitutions section for more information. - Example in Canvas View: #{datetime:yyyy-MM-dd}
- size_mb
- Syntax: A long value that is greater than 0.
- Description: Only applicable when
format
is set tojson
. The maximum file size in Amazon S3, specified in megabytes (MB). If a file reaches this maximum size while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction withrollover_secs
andinactivity_secs
. The maximum size of an Amazon S3 object is 5 TB. Even if you set
size_mb
to a value greater than 5,000,000, files close when they reach 5 TB in size.- Example in Canvas View: 1024
- rollover_secs
- Syntax: A long value that is greater than 0.
- Description: Only applicable when
format
is set tojson
. The maximum number of seconds that a file in the S3 bucket can stay open. If a file has been open for this length of time while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction withsize_mb
andinactivity_secs
. - The rollover interval is limited to multiples of 60 seconds. If you set
rollover_secs
to a value that is not a multiple of 60, the sink function uses the nearest multiple of 60 as the rollover interval. - Example in Canvas View: 120
- format
- Syntax: string
- Description: The format for data in the Amazon S3 destination. The format can be set to one of the following:
json
for line-delimited JSON format.parquet
for Parquet format. See the Parquet format section on this page for information about limitations that apply when Parquet format is used.
- These values are case-insensitive. For example,
JSON
andParquet
are also accepted. - Example: json
Optional arguments
- inactivity_secs
- Syntax: A long value that is greater than 0.
- Description: Only applicable when
format
is set tojson
. The maximum number of seconds that an inactive file in the S3 bucket can stay open. If a file has been open for this length of time without receiving any streamed data, then the file is closed and a new file with the same object name prefix is created. When the sink function resumes streaming data to Amazon S3, the data is streamed to the newly opened file. See File rollover policy for information about how to use this setting in conjunction withsize_mb
androllover_secs
. - Default: 60
- Example in Canvas View: 90
- algorithm
- Syntax: string
- Description: The server-side encryption algorithm. Encryption keys are not stored or managed by the . The algorithm can be set to:
KMS
for SSE-KMS. To use this algorithm, you must configure your connection to use an IAM user that has the required KMS permissions, and your key policy must have thekms:Decrypt
permission. See IAM user permissions in the Connect to Data Sources and Destinations with the manual for more information.AES-256
for SSE-S3.
- For more information about these encryption algorithms, search for "Protecting data using server-side encryption" in the Amazon S3 documentation.
- If you leave this parameter empty, the does not require the data to be encrypted in Amazon S3.
- Default: Empty.
- Example in Canvas View: KMS
- key_id
- Syntax: string
- Description: Only applicable when
algorithm
is set toKMS
. The key Amazon Resource Name (ARN) of the Customer Master Key (CMK). Specify the key ARN using this format:arn:aws:kms:<region>:<account-id>:key/<key-id>
. Encryption keys are not stored or managed by DSP. - If you leave this parameter empty, the AWS default master key for the region is used.
- Default: Empty.
- Example in Canvas View: arn:aws:kms:us-east-1:123412341234:key/1234abcd-56ef78gh-90ij-1234567890kl
- compression
- Syntax: string
- Description: The codec to use for file compression. If you leave this parameter empty, the files created in Amazon S3 are not compressed.
- If
format
is set tojson
: The only supported value isgzip
. - If
format
is set toparquet
: The supported values aregzip
orsnappy
.
- If
- Default: Empty.
- Example in Canvas View: gzip
- row_group_mb
- Syntax: An integer between 1 and 512, inclusive.
- Description: Only applicable when
format
is set toparquet
. The size of each row group in MB. - Default: 128
- Example in Canvas View: 256
- allow_dropping_events
- Syntax: Boolean
- Description: Only applicable when
format
is set toparquet
. Set this parameter totrue
to avoid pipeline failures by dropping records that contain fields with more than 3 levels of nesting. If this parameter is set tofalse
, the pipeline fails when a field with more than 3 levels of nesting is detected. - See the Event payload limitations section on this page for more information.
- Default: false
- Example in Canvas View: Selected
- parameters
- Syntax: map<string, string>
- Description: Leave this field empty.
SPL2 examples
When working in the SPL View, you can write the function by using the syntax shown in the following examples.
1. Store data as JSON files
In this example, records from the DSP pipeline are stored in Amazon S3 as JSON files. The files all use the static prefix "my_prefix", and they are created based on the custom rollover policy specified in the size_mb
and rollover_secs
parameters. Additionally, the files are compressed using the Gzip codec.
...| into s3("my-amazon-s3-connection", "my_bucket", "my_prefix", 1024, 120, "json", 90, "", "", "gzip");
You can omit optional arguments only if you don't specify any other arguments that must be listed after them. This example includes empty quotation marks ( "" ) as placeholders for algorithm
and key_id
because compression
is listed after it.
Alternatively, you can use named arguments to declare the arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example omits algorithm
and key_id
:
...| into s3(connection_id: "my-amazon-s3-connection", format: "json", prefix: "my_prefix", bucket: "my_bucket", size_mb: 1024, rollover_secs: 120, compression: "gzip");
If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.
2. Store data as Parquet files
In this next example, records from the DSP pipeline are stored in Amazon S3 as Parquet files. Date-time substitution is used to generate the prefixes for the files, and the file contents are split into row groups that are no larger than 256 MB. Additionally, the files are encrypted using the SSE-S3 algorithm and compressed using the Snappy codec. Any records that contain fields with more than 3 levels of nesting are dropped from the pipeline and not written to Amazon S3.
...| into s3("my-amazon-s3-connection", "my_bucket", "#{datetime:yyyy-MM-dd}", 0, 0, "parquet", 0, "AES-256", "", "snappy", 256, true);
In this example, 0
and empty quotation marks ( "" ) are placeholder values. You must include them in the SPL2 expression because size_mb
and rollover_secs
are required parameters even though they aren't applicable to Parquet format, and you can only omit optional parameters such as inactivity_secs
and key_id
if you do not specify any other parameters that must be listed after them.
Alternatively, you can use named arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example only declares the optional arguments algorithm
and compression
:
...| into s3(format: "parquet", size_mb: 0, rollover_secs: 0, connection_id: "my-amazon-s3-connection", prefix: "#{datetime:yyyy-MM-dd}", bucket: "my_bucket", algorithm: "AES-256", compression: "snappy");
If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.
Prefix parameter substitutions
The prefix
parameter supports two types of substitution: event field and date-time substitutions. When JSON format is used, both types of substitutions are supported and can be used at the same time. When Parquet format is used, only date-time substitution is supported.
If you are using dynamic parameter substitution and concurrently writing to a large number of S3 buckets, checkpointing may block the processing of your pipelines. If this is happening, contact Splunk support for help with performance tuning your S3 sink.
Event field substitution
When using JSON format, you can use dot-delimited notation in a #{ }
pattern to use the actual value of a field in the event as part of the prefix. For example, if you have the following event, then the parameter substitution notation #{attributes.account_id}
sets the prefix to my_account
and #{body.metadata.name}
sets the prefix to my_metadata_name
.
{ "body": { "metadata": { "name": "my_metadata_name" } }, "attributes": { "account_id": "my_account" } }
If you define the prefix with a field that does not exist in the event, then the substitution value will be set to unknown
. Any leading or trailing whitespaces inside #{ }
are trimmed before substitution. An empty event field substitution pattern, such as #{}
, is not allowed.
Do not use fields with a large number of unique values in the prefix substitution pattern. For example, the substitution pattern #{fieldname}
where fieldname
is a field that is substituted with an event ID or a timestamp will generate a large number of prefixes and may cause unpredictable results.
Date-time format substitution
You can use a date-time format inside a #{datetime: }
pattern to generate a prefix with date and time values. For example, the parameter substitution notation #{datetime:yyyy-MM}
generates a prefix with a year and month value similar to 2020-03
, and #{datetime:yyyy-MM-dd}
generates prefix with a year, month, and day value similar to 2020-03-31
. Any leading or trailing whitespaces inside #{datetime: }
are trimmed before substitution. An empty date-time substitution pattern, such as #{datetime:}
, is not allowed.
Combining both substitution types
When using JSON format, you can use both substitution types at the same time. For example, if you have the sample event shown in the Event field substitution section, the parameter substitution notation dsp-#{attributes.account_id}-#{datetime:yyyy-MM}
sets the prefix to dsp-my_account-2020-03
. As another example, the parameter substitution notation dsp/#{body.metadata.name}/io/#{datetime:yyyy/MM/dd}
sets the prefix to dsp/my_metadata_name/io/2020/03/31
.
Parquet format
When format
is set to parquet
, some of the sink function configurations work differently:
prefix
only supports date-time substitution. Event field substitutions are not supported.- The file rollover policy is determined by the checkpointing interval instead of the
rollover_secs
,size_mb
, andinactivity_sec
settings. Whenever the creates a checkpoint for the pipeline, which happens every 60 seconds by default, the current Parquet file closes and a new file with the same object key prefix is created. See Using activation checkpoints to activate your pipeline in the User Manual for more information about checkpoints.
See the following sections for additional Parquet-specific limitations and details.
Event payload limitations
When Parquet format is used, the body
and attributes
fields in the event payload cannot contain values that have more than three levels of nesting.
See Event schema for more information about the fields in the event payload.
The following example payload shows a body
that is set to a map of a basic data type. This is a valid payload.
{ "body": { "f1": "v1", "f2": "v2", "f3": "v3" }, "sourcetype": "my-sourcetype", "host": "my-host", "attributes": { "my-test-attr": "my-test-attr-val" } }
The following example payload shows a body
that is set to a map of a three-dimensional array. This is an invalid payload, and by default the pipeline will fail.
{ "body": { "myarray": [ [ [ "v11", "v12" ] ] ] }, "sourcetype": "my-sourcetype", "host": "my-host", "attributes": { "my-test-attr": "my-test-attr-val" } }
You can prevent pipeline failures by setting the allow_dropping_events
parameter to true
so that the function drops invalid records such as the previous example.
Data type limitations
When Parquet format is used, union-typed data becomes nested one level deeper under container fields. These container fields use generic names such as member0
, member1
, and so on. As a result, union-typed data becomes difficult to retrieve from the Parquet files.
To prevent important data from becoming obscured this way, extract relevant data from union-typed fields into top-level fields, and then cast the extracted data to the data type that you want the data to be associated with in the Parquet output. See Formatting DSP data for Parquet files in Amazon S3 in the Connect to Data Sources and Destinations with the manual for more information.
Defaults used for Parquet format
The following defaults are used for Parquet format:
- Default Parquet Writer version: Parquet v1
- Default compression codec: UNCOMPRESSED
- Default Row group size: 128 x 1024 x 1024 bytes
- Dictionary encoding: Enabled
- Default dictionary page size: 1024 x 1024 bytes
File rollover policy
When you send data to Amazon S3, the data is stored in a file in the specified bucket. To prevent situations where too much data gets stored in a single file, the sink function uses a rollover policy to change the file that it sends data to at regular intervals. This rollover policy determines when an S3 file is closed, which causes a new file with the same object key prefix to be opened and streamed data to be sent to this new file instead of the closed one.
If format
is set to parquet
, files are closed whenever the creates a checkpoint for the pipeline. By default, the creates a checkpoint every 60 seconds. See Using activation checkpoints to activate your pipeline in the User Manual for more information about checkpoints.
If format
is set to json
, files are closed when any of the following conditions are met:
- The file reaches the size limit specified in the
size_mb
argument. - The file has been receiving data for the amount of time specified in the
rollover_secs
argument. - The file has not received data for the amount of time specified in the
inactivity_secs
argument, which defaults to 90 seconds.
You will not see data in Amazon S3 until the file containing that data is closed.
You can customize the rollover policy for JSON files by configuring the size_mb
, rollover_secs
, and inactivity_secs
arguments. If your pipeline becomes backlogged, adjust your arguments to increase the size_mb
or rollover_secs
.
Example: Custom rollover policy for JSON files
In this example, the sink function is configured as follows:
prefix
is set to#{datetime:yyyy-MM-dd}
, and the current date is May 25, 2021.format
is set tojson
.size_mb
is set to500
.rollover_secs
is set to120
.inactivity_secs
is set to60
.compression
is not set.
When you activate your pipeline, the sink function opens a file with a name like part-1237464e-b248-35d6-428d-9e7dda3f0118-0.json
in the 2021-05-25
subfolder in your S3 bucket, and starts streaming data to that file. If the size of the file reaches 500 MB before 120 seconds elapse, or if 120 seconds elapse before the file reaches 500 MB in size, then the part-1237464e-b248-35d6-428d-9e7dda3f0118-0.json
file is closed and a new file named part-1237464e-b248-35d6-428d-9e7dda3f0118-1.json
is opened. The sink function then starts sending your data to this newly opened file instead.
If your pipeline is activated but there is no data streaming to S3, after 60 seconds the part-1237464e-b248-35d6-428d-9e7dda3f0118-1.json
file is closed. When data resumes streaming to S3, a file named part-1237464e-b248-35d6-428d-9e7dda3f0118-2.json
is opened and the data is stored in that file.
Send data to Amazon Kinesis Data Streams | Send data to Google Cloud Storage |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6
Feedback submitted, thanks!