Splunk® Data Stream Processor

Function Reference

Acrobat logo Download manual as PDF


DSP 1.2.0 is impacted by the CVE-2021-44228 and CVE-2021-45046 security vulnerabilities from Apache Log4j. To fix these vulnerabilities, you must upgrade to DSP 1.2.4. See Upgrade the Splunk Data Stream Processor to 1.2.4 for upgrade instructions.

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.
Acrobat logo Download topic as PDF

Send data to Amazon S3

Send data to an Amazon S3 bucket.

Files generated by this function are given a filename extension based on the file format:

  • .gz: JSON format with GZIP compression.
  • .json: JSON format without compression.
  • .parquet: Parquet format, with or without compression.

You can only write to Amazon S3 buckets. Third-party S3-compatible vendors are not supported.

Prerequisites

Before you can use this function, you must do the following:

  • Create an Amazon S3 connection. See Create a connection to send data to Amazon S3. When configuring this sink function, use the ID of that connection for the connection_id argument.
  • Create the destination bucket in your Amazon S3 instance. Don't include any periods ( . ) in the bucket name. For information about creating a bucket, search for "How do I create an S3 Bucket?" in the Amazon Simple Storage Service Console User Guide.

    If you activate your pipeline before creating the bucket specified in the bucket argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely.

Function input schema

collection<record<R>>
This function takes in collections of records with schema R.

Required arguments

connection_id
Syntax: string
Description: The Amazon S3 connection ID.
Example in Canvas View: "576205b3-f6f5-4ab7-8ffc-a4089a95d0c4"
bucket
Syntax: string
Description: The name of the destination bucket in Amazon S3.
Example in Canvas View: my_bucket

Make sure that the destination bucket exists in your Amazon S3 instance, and that the bucket name doesn't include any periods ( . ). If you activate your pipeline before creating the bucket specified in the bucket argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely.

prefix
Syntax: string
Description: The prefix to use in the object keys of the generated Amazon S3 objects.
This parameter supports prefix patterns with substitutions. Using the special syntax #{ }, you can specify field names that will be substituted with the actual field value, or specify date-time formats that will be substituted with a timestamp indicating when the S3 object was generated. See the "Prefix parameter substitutions" section for more information.
Example in Canvas View: #{datetime:yyyy-MM-dd}
size_mb
Syntax: long
Description: Only applicable when format is set to JSON. The maximum allowed file size in MB. When the maximum file size is reached, the current part of the file is closed and a new file part with the same object key prefix is created.
Example in Canvas View: 1024
rollover_secs
Syntax: long
Description: Only applicable when format is set to JSON. The maximum time interval in seconds that a part of a file can stay open before it is closed and a new file part with the same object key prefix is created. The rollover interval is limited to multiples of 60 seconds. If you set rollover_secs to a value less than 60 seconds, the rollover interval is set to 60 seconds. If you set rollover_secs to a value greater than 60 seconds, the rollover interval is rounded up to the closest multiple of 60 seconds.
Example in Canvas View: 120
format
Syntax: string
Description: The format for data in the Amazon S3 destination. The format can be set to:
  • JSON for line-delimited JSON format.
  • Parquet for Parquet format. See the "Parquet format" section on this page for information about limitations that apply when Parquet format is used.
These values are case-insensitive. For example, json and parquet are also accepted.
Example in Canvas View: JSON

Optional arguments

inactivity_secs
Syntax: long
Description: Only applicable when format is set to JSON. The number of seconds of inactivity allowed before the current part of a file is closed and a new file part with the same object key prefix is created. Defaults to 60 seconds.
Example in Canvas View: 90
algorithm
Syntax: string
Description: The server-side encryption algorithm. Encryption keys are not stored or managed by the . The algorithm can be set to:
  • KMS for SSE-KMS. To use this algorithm, you must have the required KMS permissions in Amazon S3. See the "Permissions required to enable KMS" section on this page for more information.
  • AES-256 for SSE-S3.
For more information about these encryption algorithms, search for "Protecting data using server-side encryption" in the Amazon S3 documentation.
If you leave this parameter empty, the does not require the data to be encrypted in Amazon S3.
Example in Canvas View: KMS
key_id
Syntax: string
Description: Only applicable when algorithm is set to KMS. The key Amazon Resource Name (ARN) of the Customer Master Key (CMK). Specify the key ARN using this format: arn:aws:kms:<region>:<account-id>:key/<key-id>. Encryption keys are not stored or managed by DSP.
If you leave this parameter empty, the Amazon Web Services (AWS) default master key for the region is used.
Example in Canvas View: arn:aws:kms:us-east-1:123412341234:key/1234abcd-56ef78gh-90ij-1234567890kl
compression
Syntax: string
Description: The codec to use for file compression. If you leave this parameter empty, uncompressed files are sent to S3.
  • If format is set to JSON: The only supported value is gzip.
  • If format is set to Parquet: The supported values are gzip or snappy.
Defaults to empty in both cases.
Example in Canvas View: gzip
row_group_mb
Syntax: integer
Description: Only applicable when format is set to Parquet. The size of each row group in MB. This size can range from 1 MB to 512 MB, inclusive. Defaults to 128.
Example in Canvas View: 256
allow_dropping_events
Syntax: boolean
Description: Only applicable when format is set to Parquet. Set this parameter to true to avoid pipeline failures by dropping records that contain fields with more than 3 levels of nesting. If this parameter is set to false, the pipeline fails when a field with more than 3 levels of nesting is detected. Defaults to false.
See the "Event payload limitations" section for more information.
Example in Canvas View: Selected
parameters
Syntax: map<string, string>
Description: Leave this field empty.

SPL2 examples

When working in the SPL View, you can write the function by using the syntax shown in the following examples.

1. Write data as JSON files

In this example, records from the DSP pipeline are written to Amazon S3 as JSON files. The files all use the static prefix "my_prefix", and they are divided into file parts based on the custom settings specified in the size_mb and rollover_secs parameters. Additionally, the files are compressed using the Gzip codec.

...| into s3("my-amazon-s3-connection", "my_bucket", "my_prefix", 1024, 120, "JSON", 90, "", "", "gzip"); 

You can omit optional arguments only if you don't specify any other arguments that must be listed after them. This example includes empty quotation marks ( "" ) as placeholders for algorithm and key_id because compression is listed after it.

Alternatively, you can use named arguments to declare the arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example omits algorithm and key_id.

...| into s3(connection_id: "my-amazon-s3-connection", format: "JSON", prefix: "my_prefix", bucket: "my_bucket", size_mb: 1024, rollover_secs: 120, compression: "gzip");

If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.

2. Write data as Parquet files

In this next example, records from the DSP pipeline are written to Amazon S3 as Parquet files. Date-time substitution is used to generate the prefixes for the files, and the file contents are split into row groups that are no larger than 256 MB. Additionally, the files are encrypted using the SSE-S3 algorithm and compressed using the Snappy codec. Any records that contain fields with more than 3 levels of nesting are dropped from the pipeline and not written to Amazon S3.

...| into s3("my-amazon-s3-connection", "my_bucket", "#{datetime:yyyy-MM-dd}", 0, 0, "Parquet", 0, "AES-256", "", "snappy", 256, true);

In this example, 0 and empty quotation marks ( "" ) are placeholder values. You must include them in the SPL2 expression because size_mb and rollover_secs are required parameters even though they aren't applicable to Parquet format, and you can only omit optional parameters such as inactivity_secs and key_id if you do not specify any other parameters that must be listed after them.

Alternatively, you can use named arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example only declares the optional arguments algorithm and compression:

...| into s3(format: "Parquet", size_mb: 0, rollover_secs: 0, connection_id: "my-amaozn-s3-connection", prefix: "#{datetime:yyyy-MM-dd}", bucket: "my_bucket", algorithm: "AES-256", compression: "snappy");

If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.

Prefix parameter substitutions

The prefix parameter supports two types of substitution: event field and date-time substitutions. When JSON format is used, both types of substitutions are supported and can be used at the same time. When Parquet format is used, only date-time substitution is supported.

If you are using dynamic parameter substitution and concurrently writing to a large number of S3 buckets, checkpointing may block the processing of your pipelines. If this is happening, contact Splunk support for help with performance tuning your S3 sink.

Event field substitution

When using JSON format, you can use dot-delimited notation in a #{ } pattern to use the actual value of a field in the event as part of the prefix. For example, if you have the following event, then the parameter substitution notation #{attributes.account_id} sets the prefix to my_account and #{body.metadata.name} sets the prefix to my_metadata_name.

{
  "body": {
    "metadata": {
      "name": "my_metadata_name"
    }
  },
  "attributes": {
    "account_id": "my_account"
  }
}

If you define the prefix with a field that does not exist in the event, then the substitution value will be set to unknown. Any leading or trailing whitespaces inside #{ } are trimmed before substitution. An empty event field substitution pattern, such as #{}, is not allowed.

Do not use fields with a large number of unique values in the prefix substitution pattern. For example, the substitution pattern #{fieldname} where fieldname is a field that is substituted with an event ID or a timestamp will generate a large number of prefixes and may cause unpredictable results.

Date-time format substitution

You can use a date-time format inside a #{datetime: } pattern to generate a prefix with date and time values. For example, the parameter substitution notation #{datetime:yyyy-MM} generates a prefix with a year and month value similar to 2020-03, and #{datetime:yyyy-MM-dd} generates prefix with a year, month, and day value similar to 2020-03-31. Any leading or trailing whitespaces inside #{datetime: } are trimmed before substitution. An empty date-time substitution pattern, such as #{datetime:}, is not allowed.

Combining both substitution types

When using JSON format, you can use both substitution types at the same time. For example, if you have the sample event shown in the "Event field substitution" section, the parameter substitution notation dsp-#{attributes.account_id}-#{datetime:yyyy-MM} sets the prefix to dsp-my_account-2020-03. As another example, the parameter substitution notation dsp/#{body.metadata.name}/io/#{datetime:yyyy/MM/dd} sets the prefix to dsp/my_metadata_name/io/2020/03/31.

Parquet format

If Parquet format is used:

  • rollover_secs, size_mb, and inactivity_sec are not used. File parts are automatically closed every 60 seconds and a new file part with the Amazon S3 object key prefix is created.
  • prefix only supports date-time substitution. Event field substitutions are not supported.

Event payload limitations

When Parquet format is used, the body and attributes fields in the event payload cannot contain values that have more than three levels of nesting.

See Event schema for more information about the fields in the event payload.

The following example payload shows a body that is set to a map of a basic data type. This is a valid payload.

{
  "body": {
           "f1": "v1",
           "f2": "v2",
           "f3": "v3"
           },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

The following example payload shows a body that is set to a map of a three-dimensional array. This is an invalid payload, and by default the pipeline will fail.

{
  "body": { 
           "myarray": [
                           [
                                [
                                 "v11", "v12"
                                ]
                           ]
                      ] 
          },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

You can prevent pipeline failures by setting the allow_dropping_events parameter to true so that the function drops invalid records such as the previous example.

Data type limitations

When Parquet format is used, union-typed data becomes nested one level deeper under container fields. These container fields use generic names such as member0, member1, and so on. As a result, union-typed data becomes difficult to retrieve from the Parquet files.

To prevent important data from becoming obscured this way, extract relevant data from union-typed fields into top-level fields, and then cast the extracted data to the data type that you want the data to be associated with in the Parquet output. See Formatting data for Parquet files in Amazon S3 in the Connect to Data Sources and Destinations manual for more information.

Defaults used for Parquet format

The following defaults are used for Parquet format:

  • Default Parquet Writer version: Parquet v1
  • Default compression codec: UNCOMPRESSED
  • Default Row group size: 128 x 1024 x 1024 bytes
  • Dictionary encoding: Enabled
  • Default dictionary page size: 1024 x 1024 bytes

Permissions required to enable KMS

To use KMS as the encryption algorithm, the Identity and Access Management (IAM) user specified in your Amazon S3 connection must have the following permissions:

  • kms:Decrypt
  • kms:GenerateDataKey

If your IAM user is in the same AWS account as the AWS KMS key, then you must have permission to kms:Decrypt in the key policy. If your IAM user belongs to a different account than the key, then you must have permission to kms:Decrypt in both the key policy and your IAM permissions.

Last modified on 28 April, 2021
PREVIOUS
Send data to Amazon Kinesis Data Streams
  NEXT
Send data to Kafka

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02


Was this documentation topic helpful?


You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters