Use the Amazon S3 connector with Splunk DSP
Use the Amazon S3 connector to collect data from Amazon S3 buckets. This connector is based on the Amazon Simple Queue Service (SQS).
When new data is added to an Amazon S3 bucket, S3 sends a notification to SQS with information about the new content such as bucket_name
, key
, size
, and so on. The Amazon S3 connector uses this information to download the new files from the Amazon S3 bucket, read and parse the file content, wrap the results into events, and then send the events into the Splunk Data Stream Processor (DSP) using the Collect Service.
Prerequisites
Before you can use the Amazon S3 connector, you must complete the following steps:
- Configure Amazon S3 to notify SQS that new events were written to the Amazon S3 bucket, or couple the Amazon Simple Notification Service (SNS) to SQS to send the notifications. See Configure SQS and Configure SNS in the Splunk Add-on for AWS manual for more information.
- Have an AWS account to use the Amazon S3 connector. If you don't have an AWS account, ask your AWS admin to create an account and provide the Access Key ID and Secret Access Key.
See Getting Started with Amazon SQS in the Amazon Web Services (AWS) documentation for more information on setting up and configuring AWS SQS.
Permissions for Amazon S3
Make sure your AWS account has at least read and write permissions for the queue and read permissions for the related Amazon S3 bucket. See the following list of permissions:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:ListQueues", "s3:GetObject" ], "Resource": "*" } ] }
Parameters used in the Amazon S3 connector
In addition to the common configuration parameters, the Amazon S3 connector uses the following parameters:
sqs_queue_url
: The full URL for the AWS SQS queue.aws_credential
: The AWS credentials used to access Amazon S3access_key
: The AWS access key credential informationaws_access_key_id
: Your AWS access key.aws_secret_access_key
: Your AWS secret access key.
All credentials are transmitted securely by HTTPS and saved in the Collection Service with industry-standard encryption. They cannot be accessed outside of the current tenant.
filetype
: The file type of the files sent to this job. If you don't enter a value for this parameter, it defaults toauto
. File types have the following valid values:auto
: The file is uncompressed if necessary (only zip and gzip are supported) and the file type is automatically detected based on the following rules:- If the uncompressed file is plain text, the Content-Type of the original S3 object determines the file type handling:
"text/csv"
: A standard CSV file. The first line of the file must be the header. Each line is ingested as an event."application/json"
: A standard JSON file."text/plain"
: A plain text file. Each line is ingested as an event.
- If the uncompressed file is plain text, and file type can't be determined by the Content-Type of the original S3 object, then the file extension determines the file handling:
- .csv: A standard CSV file. The first line of the file must be the header. Each line is ingested as an event.
- .txt: A plain text file. Each line is ingested as an event.
- .json: A standard JSON file.
- If the uncompressed file is plain text, and file type can't be determined by the Content-Type of the original S3 object or the file extension, then the file is parsed as a plain text file and each line is ingested as an event.
- All other file types are ignored and the files are not ingested.
- If the uncompressed file is plain text, the Content-Type of the original S3 object determines the file type handling:
json
field
: This parameter defines which field in a JSON file to be collected as events and is only applicable if you have set"filetype": "json"
. The field must be a first-level field, and the value of the field inside the target JSON files must be an array. The two parameters work together in the following ways:"filetype": "auto"
, each JSON file is ingested as a single event."filetype": "json"
and"field": ""
orfield
is not set, each JSON file is ingested as a single event."filetype": "json"
and"field": "Records"
, whereRecords
is an array, each element insideRecords
is ingested as one event."filetype": "json"
and"field": "Event"
, whereEvent
is an object, nothing is ingested because the value of"Event"
is an object, not an array."filetype": "json"
and"field": "Event.Body"
, nothing is ingested because the field is more than one layer.
If the JSON files are generated by AWS, set `"field": "Records"
.
Choose the number of workers to assign to your job
There are many factors that can influence the data ingestion rates per worker for your scheduled jobs. Some of these factors are:
- The average file size
- File compression type and compression rates
- The number of new files added to the S3 bucket between each job execution
- The file formats
- Event sizes
- Download speed and bandwidth
Generally, a small number of larger files is faster than a large number of small files, compressed files are faster than uncompressed files, and CSV files are slower than plaintext and JSON files.
The following tables show the average ingestion rates per worker for some common scenarios.
Filetype | Number of files | File size before compression | Compression | Average ingestion rate per worker |
---|---|---|---|---|
txt | 1,000 | 1 MiB | none | 5 MiB/s |
txt | 20 | 50 MiB | none | 30 MiB/s |
Filetype | Number of files | File size before compression | Compression | Average ingestion rate per worker |
---|---|---|---|---|
txt | 20 | 50 MiB | none | 30 MiB/s |
txt | 20 | 50 MiB | gz | 70 MiB/s |
Filetype | Number of files | File size before compression | Compression | Average ingestion rate per worker |
---|---|---|---|---|
txt | 20 | 50 MiB | gz | 70 MiB/s |
json | 20 | 50 MiB | gz | 70 MiB/s |
csv | 20 | 50 MiB | gz | 40 MiB/s |
The average ingestion rates are just examples and don't take into account external influences on your ingestion rate, such as download speeds, event sizes, and so on.
Limitations of the Amazon S3 connector
The Amazon S3 connector collects newly created files stored in S3 buckets in response to notifications received from SQS. The connector doesn't collect an event if it occurred before the SQS queue is set up or if notifications about the event are not received through SQS.
If a ZIP file in the Amazon S3 bucket contains more than one file, only the first file in the archive is read.
The Amazon S3 connector uses the mime_type
Content Type header attribute and file extensions to determine if a file is supported. Don't customize the file extensions when uploading files to Amazon S3.
The connector doesn't support the following use case:
- The file is a
text/csv
,text/json
, or atext/plain
formatted file, but the Content Type header attribute is customized.
Amazon S3 connector output
The following event attributes are added to the S3 connector events collected from Amazon S3:
accountID
: The account ID of the AWS account. If the account ID can't be found, the attribute value is an empty string""
.lastModified
: The last modified time of the Amazon S3 file.etag
: The etag of the Amazon S3 file.
A typical Amazon S3 connector event looks like this:
{ "accountID": "123412341234", "etag": "9290629be720a884b5119e5b122c5c8d", "lastModified": 1562717968, "host": "aws-s3-connector - v1.0.0-beta1", "source": "https://s3.us-west-1.amazonaws.com/bucket/plain/long.log.gz", "index": "main", "sourcetype": "aws:s3:plaintext", "_time": 2019-07-11T01:00:33.336+00:00, "_raw": "helloworld2" }
Create, modify, and delete a scheduled job using the Collect API
You can create, modify, and delete a scheduled job in the Amazon S3 connector using the Collect API.
Create a scheduled job
The following example creates a job, schedules it to run at 45 minutes past every hour, and assigns 2 workers:
curl -X POST "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/" \ -H "Authorization: Bearer <accessToken>" \ -H "Content-Type: application/json" \ -d '{ "name": "your connection name", "connectorID": "aws-s3", "schedule": "45 * * * *", "parameters": { "sqs_queue_url": "http(s)://sqs(-fips).<REGION>.amazonaws.com/<YOUR_ACCOUNT_NUMBER>/<YOUR_QUEUE_NAME>", "aws_credential": { "access_key": { "aws_access_key_id": "your AWS access key", "aws_secret_access_key": "your AWS secret access key" } } }, "filetype": "json", "json": { "field": "Records" }, "scalePolicy": { "static": { "workers": 2 } } }'
A typical response when you create a scheduled job using a POST request looks like this:
{ "data": { "connectorID": "aws-s3", "createUserID": "your user ID", "createdAt": "2019-02-22T14:29:23.852Z", "id": "your job ID", "lastModifiedAt": "2019-02-22T14:29:23.852Z", "lastUpdateUserID": "last user who updated", "name": "your connection name", "schedule": "45 * * * *", "scheduled": true, "tenant": "default" "eventExtraFields": null, "parameters": { "aws_credential": {}, "sqs_queue_url": "http(s)://sqs(-fips).{REGION}.amazonaws.com/{YOUR_ACCOUNT_NUMBER}/{YOUR_QUEUE_NAME}" "filetype": "json", "json": { "field": "Records" } }, "scalePolicy": { "static": { "workers": 2 } }, } }
Verify the job
After you create the scheduled job, you can find the job id
in the POST response. The following example performs a GET request on the job id
to verify the job was created and scheduled correctly:
curl -X GET "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \ -H "Authorization: Bearer <accessToken>" \ -H "Content-Type: application/json"
A typical response for a GET request on a job id
in a scheduled job looks like this:
{ "data": { "connectorID": "aws-s3", "createUserID": "your user ID", "createdAt": "2019-02-22T14:29:23.852Z", "id": "your job ID", "lastModifiedAt": "2019-02-22T14:29:23.852Z", "lastUpdateUserID": "last user who updated", "name": "your connection name", "schedule": "45 * * * *", "scheduled": true, "tenant": "default" "eventExtraFields": null, "parameters": { "aws_credential": {}, "sqs_queue_url": "http(s)://sqs(-fips).{REGION}.amazonaws.com/{YOUR_ACCOUNT_NUMBER}/{YOUR_QUEUE_NAME}" "filetype": "json", "json": { "field": "Records" } }, "scalePolicy": { "static": { "workers": 2 } }, } }
Modify a scheduled job
The following example modifies the scheduled job with the PATCH request to increase the number of workers to 4:
curl -X PATCH "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \ -H "Authorization: Bearer <accessToken>" \ -H "Content-Type: application/merge-patch+json" \ -d '{ "scalePolicy": { "static": { "workers": 4 } } }'
A typical response for a PATCH request on a scheduled job looks like this:
{ "data": { "id": "your job ID", "tenant": "default", "name": "your connection name", "connectorID": "aws-s3", "schedule": "45 * * * *", "parameters": {}, "scalePolicy": { "static": { "workers": 4 } }, "createdAt": "2019-02-22T14:29:23.852Z", "lastModifiedAt": "2019-02-25T19:19:09.892Z" } }
Note that the lastModifiedAt
value is updated.
Delete a scheduled job
The following example deletes a scheduled job based on job id
:
Make sure that no active pipelines are using the scheduled job you want to delete. If you delete a scheduled job with an active pipeline, your pipeline stops processing data.
curl -X DELETE "https://<DSP_HOST>:31000/default/collect/v1beta1/jobs/<jobId>" \ -H "Authorization: Bearer <accessToken>" \ -H "Content-Type: application/json"
When the connection is successfully deleted, you receive a "204 No content" response.
Create, modify, and delete a scheduled job using Global Connection Management
You can create, modify, and delete a scheduled job in the Amazon S3 connector with Global Connection Management. See Manage connections to external data sources.
Use the Amazon CloudWatch Metrics connector with Splunk DSP | Use the AWS Metadata connector with Splunk DSP |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.1
Feedback submitted, thanks!