Splunk® Data Stream Processor

Getting Data In

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Use the Amazon S3 connector with Splunk DSP

Use the Amazon S3 connector to collect data from Amazon S3 buckets. This connector is based on the Amazon Simple Queue Service (SQS).

When new data is added to an Amazon S3 bucket, S3 sends a notification to SQS with information about the new content such as bucket_name, key, size, and so on. The Amazon S3 connector uses this information to download the new files from the Amazon S3 bucket, read and parse the file content, wrap the results into events, and then send the events into the Splunk Data Stream Processor (DSP) using the Collect Service.

Prerequisites

Before you can use the Amazon S3 connector, you must complete the following steps:

  • Configure Amazon S3 to notify SQS that new events were written to the Amazon S3 bucket, or couple the Amazon Simple Notification Service (SNS) to SQS to send the notifications. See Configure SQS and Configure SNS in the Splunk Add-on for AWS manual for more information.
  • Have an AWS account to use the Amazon S3 connector. If you don't have an AWS account, ask your AWS admin to create an account and provide the Access Key ID and Secret Access Key.

See Getting Started with Amazon SQS in the Amazon Web Services (AWS) documentation for more information on setting up and configuring AWS SQS.

Permissions for Amazon S3

Make sure your AWS account has at least read and write permissions for the queue and read permissions for the related Amazon S3 bucket. See the following list of permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:GetQueueUrl",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:ListQueues",
        "s3:GetObject"
      ],
      "Resource": "*"
    }
  ]
}

Parameters used in the Amazon S3 connector

In addition to the common configuration parameters, the Amazon S3 connector uses the following parameters:

  • sqs_queue_url: The full URL for the AWS SQS queue.
  • aws_credential: The AWS credentials used to access Amazon S3
    • access_key: The AWS access key credential information
      • aws_access_key_id: Your AWS access key.
      • aws_secret_access_key: Your AWS secret access key.

All credentials are transmitted securely by HTTPS and saved in the Collection Service with industry-standard encryption. They cannot be accessed outside of the current tenant.

  • filetype: The file type of the files sent to this job. If you don't enter a value for this parameter, it defaults to auto. File types have the following valid values:
    • auto: The file is uncompressed if necessary (only zip and gzip are supported) and the file type is automatically detected based on the following rules:
      • If the uncompressed file is plain text, the Content-Type of the original S3 object determines the file type handling:
        • "text/csv": A standard CSV file. The first line of the file must be the header. Each line is ingested as an event.
        • "application/json": A standard JSON file.
        • "text/plain": A plain text file. Each line is ingested as an event.
      • If the uncompressed file is plain text, and file type can't be determined by the Content-Type of the original S3 object, then the file extension determines the file handling:
        • .csv: A standard CSV file. The first line of the file must be the header. Each line is ingested as an event.
        • .txt: A plain text file. Each line is ingested as an event.
        • .json: A standard JSON file.
      • If the uncompressed file is plain text, and file type can't be determined by the Content-Type of the original S3 object or the file extension, then the file is parsed as a plain text file and each line is ingested as an event.
      • All other file types are ignored and the files are not ingested.
  • json
    • field: This parameter defines which field in a JSON file to be collected as events and is only applicable if you have set "filetype": "json". The field must be a first-level field, and the value of the field inside the target JSON files must be an array. The two parameters work together in the following ways:
      • "filetype": "auto", each JSON file is ingested as a single event.
      • "filetype": "json" and "field": "" or field is not set, each JSON file is ingested as a single event.
      • "filetype": "json" and "field": "Records", where Records is an array, each element inside Records is ingested as one event.
      • "filetype": "json" and "field": "Event", where Event is an object, nothing is ingested because the value of "Event" is an object, not an array.
      • "filetype": "json" and "field": "Event.Body", nothing is ingested because the field is more than one layer.

If the JSON files are generated by AWS, set `"field": "Records".

Choose the number of workers to assign to your job

There are many factors that can influence the data ingestion rates per worker for your scheduled jobs. Some of these factors are:

  • The average file size
  • File compression type and compression rates
  • The number of new files added to the S3 bucket between each job execution
  • The file formats
  • Event sizes
  • Download speed and bandwidth

Generally, a small number of larger files is faster than a large number of small files, compressed files are faster than uncompressed files, and CSV files are slower than plaintext and JSON files.

The following tables show the average ingestion rates per worker for some common scenarios.

A large number of large files compared to a small number of large files
Filetype Number of files File size before compression Compression Average ingestion rate per worker
txt 1,000 1 MiB none 5 MiB/s
txt 20 50 MiB none 30 MiB/s
An equal number of uncompressed files compared to compressed files
Filetype Number of files File size before compression Compression Average ingestion rate per worker
txt 20 50 MiB none 30 MiB/s
txt 20 50 MiB gz 70 MiB/s
An equal number of plaintext, JSON, and CSV files
Filetype Number of files File size before compression Compression Average ingestion rate per worker
txt 20 50 MiB gz 70 MiB/s
json 20 50 MiB gz 70 MiB/s
csv 20 50 MiB gz 40 MiB/s

The average ingestion rates are just examples and don't take into account external influences on your ingestion rate, such as download speeds, event sizes, and so on.

Limitations of the Amazon S3 connector

The Amazon S3 connector collects newly created files stored in S3 buckets in response to notifications received from SQS. The connector doesn't collect an event if it occurred before the SQS queue is set up or if notifications about the event are not received through SQS.

If a ZIP file in the Amazon S3 bucket contains more than one file, only the first file in the archive is read.

The Amazon S3 connector uses the mime_type Content Type header attribute and file extensions to determine if a file is supported. Don't customize the file extensions when uploading files to Amazon S3.

The connector doesn't support the following use case:

  • The file is a text/csv, text/json, or a text/plain formatted file, but the Content Type header attribute is customized.

Amazon S3 connector output

The following event attributes are added to the S3 connector events collected from Amazon S3:

  • accountID: The account ID of the AWS account. If the account ID can't be found, the attribute value is an empty string "".
  • lastModified: The last modified time of the Amazon S3 file.
  • etag: The etag of the Amazon S3 file.

A typical Amazon S3 connector event looks like this:

{
   "accountID": "123412341234",
   "etag": "9290629be720a884b5119e5b122c5c8d",
   "lastModified": 1562717968,
   "host": "aws-s3-connector - v1.0.0-beta1",
   "source": "https://s3.us-west-1.amazonaws.com/bucket/plain/long.log.gz",
   "index": "main",
   "sourcetype": "aws:s3:plaintext",
   "_time": 2019-07-11T01:00:33.336+00:00,
   "_raw": "helloworld2"
}

Create, modify, and delete a scheduled job using the Collect API

You can create, modify, and delete a scheduled job in the Amazon S3 connector using the Collect API.

Create a scheduled job

The following example creates a job, schedules it to run at 45 minutes past every hour, and assigns 2 workers:

curl -X POST "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/" \
    -H "Authorization: Bearer <accessToken>" \
    -H "Content-Type: application/json" \
    -d '{
            "name": "your connection name",
            "connectorID": "aws-s3",
            "schedule": "45 * * * *",
            "parameters": {
                "sqs_queue_url": "http(s)://sqs(-fips).<REGION>.amazonaws.com/<YOUR_ACCOUNT_NUMBER>/<YOUR_QUEUE_NAME>",
                "aws_credential": {
                    "access_key": {
                        "aws_access_key_id": "your AWS access key",
                        "aws_secret_access_key": "your AWS secret access key"
                    }
                }
            },
            "filetype": "json",
            "json": {
                "field": "Records"
            },
            "scalePolicy": {
                "static": {
                    "workers": 2
                }
            }
        }'

A typical response when you create a scheduled job using a POST request looks like this:

{
    "data": {
        "connectorID": "aws-s3",
        "createUserID": "your user ID",
        "createdAt": "2019-02-22T14:29:23.852Z",
        "id": "your job ID",
        "lastModifiedAt": "2019-02-22T14:29:23.852Z",
        "lastUpdateUserID": "last user who updated",
        "name": "your connection name",
        "schedule": "45 * * * *",
        "scheduled": true,
        "tenant": "default"
        "eventExtraFields": null,
        "parameters": {
            "aws_credential": {},
            "sqs_queue_url": "http(s)://sqs(-fips).{REGION}.amazonaws.com/{YOUR_ACCOUNT_NUMBER}/{YOUR_QUEUE_NAME}"
            "filetype": "json",
            "json": {
                "field": "Records"
            }
        },
        "scalePolicy": {
            "static": {
                "workers": 2
            }
        },
        
    }
}

Verify the job

After you create the scheduled job, you can find the job id in the POST response. The following example performs a GET request on the job id to verify the job was created and scheduled correctly:

curl -X GET "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \
    -H "Authorization: Bearer <accessToken>" \
    -H "Content-Type: application/json"

A typical response for a GET request on a job id in a scheduled job looks like this:

{
    "data": {
        "connectorID": "aws-s3",
        "createUserID": "your user ID",
        "createdAt": "2019-02-22T14:29:23.852Z",
        "id": "your job ID",
        "lastModifiedAt": "2019-02-22T14:29:23.852Z",
        "lastUpdateUserID": "last user who updated",
        "name": "your connection name",
        "schedule": "45 * * * *",
        "scheduled": true,
        "tenant": "default"
        "eventExtraFields": null,
        "parameters": {
            "aws_credential": {},
            "sqs_queue_url": "http(s)://sqs(-fips).{REGION}.amazonaws.com/{YOUR_ACCOUNT_NUMBER}/{YOUR_QUEUE_NAME}"
            "filetype": "json",
            "json": {
                "field": "Records"
            }
        },
        "scalePolicy": {
            "static": {
                "workers": 2
            }
        },
        
    }
}

Modify a scheduled job

The following example modifies the scheduled job with the PATCH request to increase the number of workers to 4:

curl -X PATCH "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \
    -H "Authorization: Bearer <accessToken>" \
    -H "Content-Type: application/merge-patch+json" \
    -d '{
            "scalePolicy": {
                "static": {
                    "workers": 4
                }
            }
        }'

A typical response for a PATCH request on a scheduled job looks like this:

{
  "data": {
    "id": "your job ID",
    "tenant": "default",
    "name": "your connection name",
    "connectorID": "aws-s3",
    "schedule": "45 * * * *",
    "parameters": {},
    "scalePolicy": {
      "static": {
        "workers": 4
      }
    },
    "createdAt": "2019-02-22T14:29:23.852Z",
    "lastModifiedAt": "2019-02-25T19:19:09.892Z"
  }
}

Note that the lastModifiedAt value is updated.

Delete a scheduled job

The following example deletes a scheduled job based on job id:

Make sure that no active pipelines are using the scheduled job you want to delete. If you delete a scheduled job with an active pipeline, your pipeline stops processing data.

curl -X DELETE "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \
    -H "Authorization: Bearer <accessToken>" \
    -H "Content-Type: application/json"

When the connection is successfully deleted, you receive a "204 No content" response.

Create, modify, and delete a scheduled job using Global Connection Management

You can create, modify, and delete a scheduled job in the Amazon S3 connector with Global Connection Management. See Manage connections to external data sources.

Last modified on 16 March, 2020
Use the Amazon CloudWatch Metrics connector with Splunk DSP   Use the AWS Metadata connector with Splunk DSP

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters