Splunk® Data Stream Processor

Function Reference

DSP 1.2.1 is impacted by the CVE-2021-44228 and CVE-2021-45046 security vulnerabilities from Apache Log4j. To fix these vulnerabilities, you must upgrade to DSP 1.2.4. See Upgrade the Splunk Data Stream Processor to 1.2.4 for upgrade instructions.

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details.

Send data to Amazon S3

Use the Send to Amazon S3 sink function to send data to an Amazon S3 bucket.

Depending on the format and compression settings, data is stored in one of the following formats in the destination S3 bucket:

  • When format is set to json and compression is not set, records are stored in a .json file.
  • When format is set to json and compression is set to gzip, records are stored in a .json.gz file.
  • When format is set to parquet, records are stored in a .parquet file regardless of the compression setting.

You will not see data in Amazon S3 until the file containing that data is closed. See File rollover policy for more information.

Prerequisites

Before you can use this function, you must do the following:

  • Create an Amazon S3 connection. See Create a DSP connection to send data to Amazon S3 in the Connect to Data Sources and Destinations with the manual. When configuring this sink function, set the connection_id argument to the ID of that connection.
  • Create the destination bucket in your Amazon S3 instance. Don't include any periods ( . ) in the bucket name. For information about creating a bucket, search for "How do I create an S3 Bucket?" in the Amazon Simple Storage Service Console User Guide.

    If you activate your pipeline before creating the bucket specified in the bucket argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely.

Function input schema

collection<record<R>>
This function takes in collections of records with schema R.

Required arguments

connection_id
Syntax: string
Description: The Amazon S3 connection ID.
Example in Canvas View: my-amazon-s3-connection
bucket
Syntax: string
Description: The name of the destination bucket in Amazon S3.
Example in Canvas View: my_bucket

Make sure that the destination bucket exists in your Amazon S3 instance, and that the bucket name doesn't include any periods ( . ). If you activate your pipeline before creating the bucket specified in the bucket argument, or specify a bucket that has a period ( . ) in its name, the pipeline fails to send data to Amazon S3 and restarts indefinitely.

prefix
Syntax: string
Description: The prefix to use in the object keys of the generated Amazon S3 objects.
This parameter supports prefix patterns with substitutions. Using the special syntax #{ }, you can specify field names that will be substituted with the actual field value, or specify date-time formats that will be substituted with a timestamp indicating when the S3 object was generated. See the Prefix parameter substitutions section for more information.
Example in Canvas View: #{datetime:yyyy-MM-dd}
size_mb
Syntax: A long value that is greater than 0.
Description: Only applicable when format is set to json. The maximum file size in Amazon S3, specified in megabytes (MB). If a file reaches this maximum size while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with rollover_secs and inactivity_secs.

The maximum size of an Amazon S3 object is 5 TB. Even if you set size_mb to a value greater than 5,000,000, files close when they reach 5 TB in size.

Example in Canvas View: 1024
rollover_secs
Syntax: A long value that is greater than 0.
Description: Only applicable when format is set to json. The maximum number of seconds that a file in the S3 bucket can stay open. If a file has been open for this length of time while the sink function is streaming data to it, then the file closes and a new file with the same object name prefix is created. The sink function continues streaming data to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with size_mb and inactivity_secs.
The rollover interval is limited to multiples of 60 seconds. If you set rollover_secs to a value that is not a multiple of 60, the sink function uses the nearest multiple of 60 as the rollover interval.
Example in Canvas View: 120
format
Syntax: string
Description: The format for data in the Amazon S3 destination. The format can be set to one of the following:
  • json for line-delimited JSON format.
  • parquet for Parquet format. See the Parquet format section on this page for information about limitations that apply when Parquet format is used.
These values are case-insensitive. For example, JSON and Parquet are also accepted.
Example: json

Optional arguments

inactivity_secs
Syntax: A long value that is greater than 0.
Description: Only applicable when format is set to json. The maximum number of seconds that an inactive file in the S3 bucket can stay open. If a file has been open for this length of time without receiving any streamed data, then the file is closed and a new file with the same object name prefix is created. When the sink function resumes streaming data to Amazon S3, the data is streamed to the newly opened file. See File rollover policy for information about how to use this setting in conjunction with size_mb and rollover_secs.
Default: 60
Example in Canvas View: 90
algorithm
Syntax: string
Description: The server-side encryption algorithm. Encryption keys are not stored or managed by the . The algorithm can be set to:
  • KMS for SSE-KMS. To use this algorithm, you must configure your connection to use an IAM user that has the required KMS permissions, and your key policy must have the kms:Decrypt permission. See IAM user permissions in the Connect to Data Sources and Destinations with the manual for more information.
  • AES-256 for SSE-S3.
For more information about these encryption algorithms, search for "Protecting data using server-side encryption" in the Amazon S3 documentation.
If you leave this parameter empty, the does not require the data to be encrypted in Amazon S3.
Default: Empty.
Example in Canvas View: KMS
key_id
Syntax: string
Description: Only applicable when algorithm is set to KMS. The key Amazon Resource Name (ARN) of the Customer Master Key (CMK). Specify the key ARN using this format: arn:aws:kms:<region>:<account-id>:key/<key-id>. Encryption keys are not stored or managed by DSP.
If you leave this parameter empty, the AWS default master key for the region is used.
Default: Empty.
Example in Canvas View: arn:aws:kms:us-east-1:123412341234:key/1234abcd-56ef78gh-90ij-1234567890kl
compression
Syntax: string
Description: The codec to use for file compression. If you leave this parameter empty, the files created in Amazon S3 are not compressed.
  • If format is set to json: The only supported value is gzip.
  • If format is set to parquet: The supported values are gzip or snappy.
Default: Empty.
Example in Canvas View: gzip
row_group_mb
Syntax: An integer between 1 and 512, inclusive.
Description: Only applicable when format is set to parquet. The size of each row group in MB.
Default: 128
Example in Canvas View: 256
allow_dropping_events
Syntax: Boolean
Description: Only applicable when format is set to parquet. Set this parameter to true to avoid pipeline failures by dropping records that contain fields with more than 3 levels of nesting. If this parameter is set to false, the pipeline fails when a field with more than 3 levels of nesting is detected.
See the Event payload limitations section on this page for more information.
Default: false
Example in Canvas View: Selected
parameters
Syntax: map<string, string>
Description: Leave this field empty.

SPL2 examples

When working in the SPL View, you can write the function by using the syntax shown in the following examples.

1. Store data as JSON files

In this example, records from the DSP pipeline are stored in Amazon S3 as JSON files. The files all use the static prefix "my_prefix", and they are created based on the custom rollover policy specified in the size_mb and rollover_secs parameters. Additionally, the files are compressed using the Gzip codec.

...| into s3("my-amazon-s3-connection", "my_bucket", "my_prefix", 1024, 120, "json", 90, "", "", "gzip"); 

You can omit optional arguments only if you don't specify any other arguments that must be listed after them. This example includes empty quotation marks ( "" ) as placeholders for algorithm and key_id because compression is listed after it.

Alternatively, you can use named arguments to declare the arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example omits algorithm and key_id:

...| into s3(connection_id: "my-amazon-s3-connection", format: "json", prefix: "my_prefix", bucket: "my_bucket", size_mb: 1024, rollover_secs: 120, compression: "gzip");

If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.

2. Store data as Parquet files

In this next example, records from the DSP pipeline are stored in Amazon S3 as Parquet files. Date-time substitution is used to generate the prefixes for the files, and the file contents are split into row groups that are no larger than 256 MB. Additionally, the files are encrypted using the SSE-S3 algorithm and compressed using the Snappy codec. Any records that contain fields with more than 3 levels of nesting are dropped from the pipeline and not written to Amazon S3.

...| into s3("my-amazon-s3-connection", "my_bucket", "#{datetime:yyyy-MM-dd}", 0, 0, "parquet", 0, "AES-256", "", "snappy", 256, true);

In this example, 0 and empty quotation marks ( "" ) are placeholder values. You must include them in the SPL2 expression because size_mb and rollover_secs are required parameters even though they aren't applicable to Parquet format, and you can only omit optional parameters such as inactivity_secs and key_id if you do not specify any other parameters that must be listed after them.

Alternatively, you can use named arguments in any order and omit any optional arguments you don't want to declare. All unprovided arguments use their default values. The following example only declares the optional arguments algorithm and compression:

...| into s3(format: "parquet", size_mb: 0, rollover_secs: 0, connection_id: "my-amazon-s3-connection", prefix: "#{datetime:yyyy-MM-dd}", bucket: "my_bucket", algorithm: "AES-256", compression: "snappy");

If you want to use a mix of unnamed and named arguments in your functions, you need to list all unnamed arguments in the correct order before providing the named arguments.

Prefix parameter substitutions

The prefix parameter supports two types of substitution: event field and date-time substitutions. When JSON format is used, both types of substitutions are supported and can be used at the same time. When Parquet format is used, only date-time substitution is supported.

If you are using dynamic parameter substitution and concurrently writing to a large number of S3 buckets, checkpointing may block the processing of your pipelines. If this is happening, contact Splunk support for help with performance tuning your S3 sink.

Event field substitution

When using JSON format, you can use dot-delimited notation in a #{ } pattern to use the actual value of a field in the event as part of the prefix. For example, if you have the following event, then the parameter substitution notation #{attributes.account_id} sets the prefix to my_account and #{body.metadata.name} sets the prefix to my_metadata_name.

{
  "body": {
    "metadata": {
      "name": "my_metadata_name"
    }
  },
  "attributes": {
    "account_id": "my_account"
  }
}

If you define the prefix with a field that does not exist in the event, then the substitution value will be set to unknown. Any leading or trailing whitespaces inside #{ } are trimmed before substitution. An empty event field substitution pattern, such as #{}, is not allowed.

Do not use fields with a large number of unique values in the prefix substitution pattern. For example, the substitution pattern #{fieldname} where fieldname is a field that is substituted with an event ID or a timestamp will generate a large number of prefixes and may cause unpredictable results.

Date-time format substitution

You can use a date-time format inside a #{datetime: } pattern to generate a prefix with date and time values. For example, the parameter substitution notation #{datetime:yyyy-MM} generates a prefix with a year and month value similar to 2020-03, and #{datetime:yyyy-MM-dd} generates prefix with a year, month, and day value similar to 2020-03-31. Any leading or trailing whitespaces inside #{datetime: } are trimmed before substitution. An empty date-time substitution pattern, such as #{datetime:}, is not allowed.

Combining both substitution types

When using JSON format, you can use both substitution types at the same time. For example, if you have the sample event shown in the Event field substitution section, the parameter substitution notation dsp-#{attributes.account_id}-#{datetime:yyyy-MM} sets the prefix to dsp-my_account-2020-03. As another example, the parameter substitution notation dsp/#{body.metadata.name}/io/#{datetime:yyyy/MM/dd} sets the prefix to dsp/my_metadata_name/io/2020/03/31.

Parquet format

When format is set to parquet, some of the sink function configurations work differently:

  • prefix only supports date-time substitution. Event field substitutions are not supported.
  • The file rollover policy is determined by the checkpointing interval instead of the rollover_secs, size_mb, and inactivity_sec settings. Whenever the creates a checkpoint for the pipeline, which happens every 60 seconds by default, the current Parquet file closes and a new file with the same object key prefix is created. See Using activation checkpoints to activate your pipeline in the User Manual for more information about checkpoints.

See the following sections for additional Parquet-specific limitations and details.

Event payload limitations

When Parquet format is used, the body and attributes fields in the event payload cannot contain values that have more than three levels of nesting.

See Event schema for more information about the fields in the event payload.

The following example payload shows a body that is set to a map of a basic data type. This is a valid payload.

{
  "body": {
           "f1": "v1",
           "f2": "v2",
           "f3": "v3"
           },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

The following example payload shows a body that is set to a map of a three-dimensional array. This is an invalid payload, and by default the pipeline will fail.

{
  "body": { 
           "myarray": [
                           [
                                [
                                 "v11", "v12"
                                ]
                           ]
                      ] 
          },
  "sourcetype": "my-sourcetype",
  "host": "my-host",
  "attributes": {
                 "my-test-attr": "my-test-attr-val"
                }
 }

You can prevent pipeline failures by setting the allow_dropping_events parameter to true so that the function drops invalid records such as the previous example.

Data type limitations

When Parquet format is used, union-typed data becomes nested one level deeper under container fields. These container fields use generic names such as member0, member1, and so on. As a result, union-typed data becomes difficult to retrieve from the Parquet files.

To prevent important data from becoming obscured this way, extract relevant data from union-typed fields into top-level fields, and then cast the extracted data to the data type that you want the data to be associated with in the Parquet output. See Formatting DSP data for Parquet files in Amazon S3 in the Connect to Data Sources and Destinations with the manual for more information.

Defaults used for Parquet format

The following defaults are used for Parquet format:

  • Default Parquet Writer version: Parquet v1
  • Default compression codec: UNCOMPRESSED
  • Default Row group size: 128 x 1024 x 1024 bytes
  • Dictionary encoding: Enabled
  • Default dictionary page size: 1024 x 1024 bytes

File rollover policy

When you send data to Amazon S3, the data is stored in a file in the specified bucket. To prevent situations where too much data gets stored in a single file, the sink function uses a rollover policy to change the file that it sends data to at regular intervals. This rollover policy determines when an S3 file is closed, which causes a new file with the same object key prefix to be opened and streamed data to be sent to this new file instead of the closed one.

If format is set to parquet, files are closed whenever the creates a checkpoint for the pipeline. By default, the creates a checkpoint every 60 seconds. See Using activation checkpoints to activate your pipeline in the User Manual for more information about checkpoints.

If format is set to json, files are closed when any of the following conditions are met:

  • The file reaches the size limit specified in the size_mb argument.
  • The file has been receiving data for the amount of time specified in the rollover_secs argument.
  • The file has not received data for the amount of time specified in the inactivity_secs argument, which defaults to 90 seconds.

You will not see data in Amazon S3 until the file containing that data is closed.

You can customize the rollover policy for JSON files by configuring the size_mb, rollover_secs, and inactivity_secs arguments. If your pipeline becomes backlogged, adjust your arguments to increase the size_mb or rollover_secs.

Example: Custom rollover policy for JSON files

In this example, the sink function is configured as follows:

  • prefix is set to #{datetime:yyyy-MM-dd}, and the current date is May 25, 2021.
  • format is set to json.
  • size_mb is set to 500.
  • rollover_secs is set to 120.
  • inactivity_secs is set to 60.
  • compression is not set.

When you activate your pipeline, the sink function opens a file with a name like part-1237464e-b248-35d6-428d-9e7dda3f0118-0.json in the 2021-05-25 subfolder in your S3 bucket, and starts streaming data to that file. If the size of the file reaches 500 MB before 120 seconds elapse, or if 120 seconds elapse before the file reaches 500 MB in size, then the part-1237464e-b248-35d6-428d-9e7dda3f0118-0.json file is closed and a new file named part-1237464e-b248-35d6-428d-9e7dda3f0118-1.json is opened. The sink function then starts sending your data to this newly opened file instead.

If your pipeline is activated but there is no data streaming to S3, after 60 seconds the part-1237464e-b248-35d6-428d-9e7dda3f0118-1.json file is closed. When data resumes streaming to S3, a file named part-1237464e-b248-35d6-428d-9e7dda3f0118-2.json is opened and the data is stored in that file.

Last modified on 14 June, 2022
Send data to Amazon Kinesis Data Streams   Send data to Google Cloud Storage

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters