All DSP releases prior to DSP 1.4.0 use Gravity, a Kubernetes orchestrator, which has been announced end-of-life. We have replaced Gravity with an alternative component in DSP 1.4.0. Therefore, we will no longer provide support for versions of DSP prior to DSP 1.4.0 after July 1, 2023. We advise all of our customers to upgrade to DSP 1.4.0 in order to continue to receive full product support from Splunk.
Formatting data from Amazon Kinesis Data Streams for indexing in the Splunk platform
When you use the Amazon Kinesis Data Stream source function to receive data, the output records use a schema that can't be indexed meaningfully in the Splunk platform. If you send Amazon Kinesis records to an index without formatting the records first, you'll notice problems such as the following:
- The records are indexed as empty events that have associated metadata but no payload.
- Some of the metadata fields contain values that pertain to your use of the DSP pipeline and the Splunk HTTP Event Collector (HEC) rather than the actual log or event from Amazon Kinesis Data Streams. For example, the timestamp in the indexed record indicates the time when the event was ingested into the rather than the time when the event was actually generated.
- In some cases, the
value
field in the record contains data values that are not human-readable, such as Gzip-compressed data.
Before sending Amazon Kinesis data from a DSP data pipeline to a Splunk index, make sure to format your records to meet the following criteria:
- The payload of the record is stored in a top-level field named
body
. - Any important pieces of metadata, such as the timestamp or source type associated with the record, are stored in the following top-level fields:
timestamp
,source_type
,host
, andsource
. - The data values in the records are human-readable.
For an example of how to build a custom pipeline that completes the necessary data processing, see the Example: Send data from Amazon Kinesis Data Streams to the Splunk platform using the section on this page.
Problems with indexing unprocessed data from Amazon Kinesis Data Streams
The Amazon Kinesis Data Stream source function outputs records that use the following schema:
{ "key": string, "value": bytes, "stream": string, "shard": string, "sequence": string, "approxArrivalTimestamp": long, "accountId": string, "region": string }
Amazon Kinesis records don't contain the body
field, since their payloads are stored in the value
field instead. As a result, the Splunk platform indexes these records as empty events.
Additionally, Amazon Kinesis records don't include the timestamp
, host
, source
, and source_type
fields, which are part of the standard DSP schemas. When you use the Send to Splunk HTTP Event Collector sink function to send data from a DSP pipeline to a Splunk index, the sink function maps these standard fields to analogous fields in the resulting indexed records. Since the Amazon Kinesis records don't contain values for these fields, these fields become populated with default metadata. For example, if you send an Amazon Kinesis record to a Splunk index without formatting the record first, the indexed record displays the following metadata:
- The timestamp in the indexed record indicates the time when the event was ingested into DSP rather than the time when the event was actually generated.
- The
host
field contains the Splunk HEC endpoint. - The
source
field contains the default source specified in your Splunk HEC token. If your Splunk HEC token doesn't specify a default source, then thesource
field contains the valuehttp:<pipeline-name>
, where <pipeline-name> is the name of your DSP pipeline. - The
sourcetype
field contains the default sourcetype specified in your Splunk HEC token. If your Splunk HEC token doesn't specify a default sourcetype, then thesourcetype
field contains the valuehttpevent
.
For more information about how the Send to Splunk HTTP Event Collector sink function maps records to the Splunk HEC event schema and determines default metadata values, see Format event data in DSP for Splunk indexes.
Depending on the specific type of data that you're ingesting from Amazon Kinesis Data Streams, the payload might be formatted in a way that is not human-readable. For instance, Amazon Virtual Private Cloud (VPC) flow logs are Gzip-compressed. In such cases, you'll need to convert the payload into a human-readable format such as plain text strings so that you can actually understand and work with the data.
The following example describes how to build a custom pipeline that formats records from Amazon Kinesis Data Streams so that the data is human-readable and mapped to the appropriate top-level fields when you send the data to the Splunk platform for indexing.
Example: Send data from Amazon Kinesis Data Streams to the Splunk platform using the
You can use DSP to process Amazon Virtual Private Cloud (VPC) flow logs from Amazon Kinesis Data Streams and then send the data to a Splunk index. To ensure that the logs can be indexed meaningfully, format the log data so that relevant information is stored in the following fields:
body
source_type
source
host
timestamp
This example shows you how to do the following tasks:
- Deserialize the
value
field in the Amazon Kinesis records so that it is converted from bytes to a map of key-value pairs. This conversion is a prerequisite for any data processing that you want to apply to thevalue
field; converting the field from bytes to a more commonly supported data type makes it compatible with a wider range of streaming functions. - Extract relevant data from various fields into top-level
host
,source
,source_type
, andtimestamp
fields. This task also includes casting the data to the appropriate data types as necessary. - Expand a multivalue field into separate events so that you can process your data with greater precision.
- Reduce noise in your data by filtering out irrelevant events and dropping any fields that you don't need to retain.
- Send your transformed data to a Splunk index.
This example works with VPC flow logs specifically, but other data handled by Amazon Kinesis Data Streams such as Amazon CloudWatch logs can be processed using a similar approach.
The following instructions explain how to build the example pipeline from scratch. If you want to view the example pipeline in DSP without going through the steps to build it, you can import a copy of the pipeline using an SPL2 statement or a JSON file.
To import the pipeline using an SPL2 statement, do the following:
- In the Canvas View in DSP, click the SPL View toggle to switch to the SPL View.
- Copy the following SPL2 statement and paste it into the SPL2 Pipeline Builder.
| from kinesis("YourKinesisConnectionID", "NameOfYourKinesisStream") | eval value=deserialize_json_object(gunzip(value)) | where map_get(value, "logGroup") IN ("NameOfYourVPCLogGroup1", "NameOfYourVPCLogGroup2") | eval owner=ucast(map_get(value, "owner"), "string", null), logGroup=ucast(map_get(value, "logGroup"), "string", null), source=concat(owner, "_", region, "_", logGroup), host=ucast(map_get(value, "logStream"), "string", null), messageType=ucast(map_get(value, "messageType"), "string", null) | eval source_type="NameOfYourSourcetype" | eval logEvents=ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null) | fields - value, region, owner, logGroup | mvexpand limit=0 logEvents | eval body=ucast(map_get(logEvents, "message"), "string", null), timestamp=ucast(map_get(logEvents, "timestamp"), "long", null) | where match_regex(body, /NODATA/)=false | into splunk_enterprise_indexes("YourHECConnectionID", "SelectedIndex", "SelectedDefaultIndex");
- Click Recreate Pipeline.
To import the pipeline using a JSON file, do the following:
- Download a copy of the pipeline by clicking this link: DSP_AmazonKinesisPipeline.zip
- Extract the JSON file from the downloaded ZIP archive to any location of your choice.
- In the Canvas View in DSP, click the pipeline options button and then select Import pipeline.
- Select the DSP_AmazonKinesisPipeline.json file and then click Import.
Prerequisites
To process VPC flow logs from Amazon Kinesis Data Streams and send them to a Splunk index, you need to have the following:
- A connection that uses the Connector for Amazon Kinesis Data Streams Source. See Create a DSP connection to Amazon Kinesis Data Streams.
- A Splunk platform connection. See Create a DSP connection to a Splunk index.
Best practices
Follow these guidelines to get a better experience building your VPC flow logs pipeline in DSP:
- Rename your functions to keep track of their roles, as you might be using multiple instances of the same function in your pipeline.
- Use the Fields function in your pipeline to drop any fields that you don't need in your data, so that these fields aren't indexed into the Splunk platform. Dropping unnecessary fields also reduces memory usage during preview and processing.
- Extract metadata from various fields and consolidate it into a single field, so that you can drop the original fields that the metadata was extracted from. Consolidating your metadata allows you to retain the metadata while trimming down on the number of fields in your records.
- When previewing the data, switch to the List view to see a clearer layout of the JSON format:
- In the Preview Results tab, on the navigation bar, click Table.
- Select List from the drop-down menu.
Step 1: Receive VPC flow logs from Amazon Kinesis Data Streams and format the payload
When you ingest VPC flow logs through the Amazon Kinesis Data Stream source function, the payloads of these logs are stored as Gzip-compressed values in a bytes field named value
. You'll need to decompress the data so that the values become human-readable, and deserialize the value
field from bytes to a map of key-value pairs so that the field can be used as input in a wider range of streaming functions.
Because Amazon Kinesis Data Streams is a transport mechanism, the incoming stream might include other types of data in addition to VPC flow logs. Filter out any records that don't contain VPC flow logs.
- In DSP, on the Pipelines page, click Create Pipeline.
- Select the Amazon Kinesis Data Stream data source.
- Configure the Amazon Kinesis Data Stream source function to start receiving data.
- On the View Configurations tab, set the Connection id field to the ID of your Amazon Kinesis Data Streams connection.
- In the Stream name field, enter the name of the Amazon Kinesis data stream that you want to read data from.
- (Optional) Click the Start Preview icon () and check the Preview Results pane to confirm that your data is arriving in DSP as expected.
Expand this section to see an example of what one of your records might look like in the List view.
{ key: "15bf2d724aef70bd8f7e1b0a1037c7ad", value: "H4sIAAAAAAAAAL2X207bQBCGX2Xla7B2Dju7w10EAfWkooa7ClUudZGlkKDEtKoQ797xpgGakqpSs7mJ4l1n7S9z+P+5r27a5bK5bi9+3LbVUXUyuhh9ejeeTEZn4+qgmn+ftQtb1sTCEhATRVuezq/PFvO7W9t5ddpctf38ZHL+dr2Y9yf9om1u7IZ21h36huVKI5O0MdFX7w+b6dTuW959Xl4tutu+m89Ou2nfLpbV0cfqpF323awZVqvLfNr4Wzvrh737qvtih1IISEGIMQAzUULxXkWAMGD0RCyijFFIVAnRLgIRoD2z74y4b27s5SEoKoFE+7E/WP8Tdjy658DuRQQHEWuCWnwNMa6vAtVBHSuxd8kndeIAHbOCe3ra41dFNzo+Hp9fuPdvqoeD/4OjAnABa2DdQM1cwUdORofswCNxcTwugZej9QKdBRAzHbrEmorDhXKJSbFmrZHsymJlQEFVcl6SQx0ytzCblC+6VZZaSgo/Vh05DozF6eK+q84QDQWcRCyfl2nPRWddMzdMibF8R9F9hw5kaCkWNQDU0njoS+BJDR428FTV2oxIyJkpkKxnQigueAh7bCz0JOf2QeCL0+3Bq6yCuYpcDqJYz/QWSzRBDxCKM5awLNsiaMX3K4JWewmhuDRgCceyjc682JrOpIF9+fwsYVlekoYAkR9DB8lMi1nd8t1zD64lw660ITMOymdZae9Rvnfu2LYYW6yjr/VP9wmmB0F9FnbH2/Lyw/j1+HgnZEUsy99UnQY09HYcFTfTWMKzbJlgMT2XPEzFa4526liCUYEpnM0+uEHKHIcS42FqNW631YztLCtpp17l95Fuo13m6c6Q8wgEKf5DL7l8+AlBltTKBxIAAA==", stream: "VPCtoKinesisStream", shard: "shardId-000000000002", sequence: "49606703114497523631175608026491897379670462900935327778", approxArrivalTimestamp: 1592931758265, accountId: "984646522837", region: "us-west-2" }
- Decompress and deserialize the contents of the
value
field.- On the pipeline canvas, click the Connect a processing or a sink function icon () next to the Amazon Kinesis Data Stream function, then select Eval from the function picker.
- On the View Configurations tab, in the Function field, enter the following expression:
value = deserialize_json_object(gunzip(value))
- (Optional) Click the Start Preview icon (), then click the Eval function to confirm that data in the
value
field has been reformatted into JSON format.
Expand this section to see an example of what one of your formatted records might look like in the List view.
{ value: { owner: "984646522837", subscriptionFilters: [ "Destination" ], logEvents: [ { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464193", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464194", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464195", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 } ], messageType: "DATA_MESSAGE", logGroup: "IFacetoLogGroup", logStream: "eni-0a8d610ca281edcb7-all" }, key: "c60e2717694a08f5d2e5e6889196506a", stream: "VPCtoKinesisStream", shard: "shardId-000000000003", sequence: "49607591761037110041819595034591373398647257439043321906", approxArrivalTimestamp: 1592931641083, accountId: "984646522837", region: "us-west-2" }
- Filter the incoming data so that only records that contain VPC flow logs are included.
- On the pipeline canvas, click the Connect a processing or a sink function icon () after the Eval function, then select Where from the function picker.
- On the View Configurations tab, enter the following expression in the Predicate field, where <NameofYourVPCLogGroup> is the
logGroup
name used in your VPC flow logs. You can specify a comma-separated list of multiple VPClogGroup
names.map_get(value, "logGroup") IN ("<NameOfYourVPCLogGroup1>", "<NameOfYourVPCLogGroup2>")
This expression keeps only the records where the value of the
logGroup
field, which is nested in thevalue
map, matches your specified VPClogGroup
names. - (Optional) Click the Start Preview icon (), then click the Where function to confirm that the data has been filtered properly.
The next few sections demonstrate how you can extract log data into top-level fields, drop unnecessary fields, and expand grouped events into separate records. Transforming your records this way ensures that they can be properly indexed in the Splunk platform.
Step 2: Extract log data into fields that can be mapped to the Splunk HEC event schema
To ensure that the relevant data can be indexed properly, extract the data from the value
field into top-level fields that can be mapped to the Splunk HEC event schema. In particular, you must extract the logEvents
array from the value
field to a top-level field.
The logEvents
array in each record contains multiple events. During a later step in this example, you'll use the MV Expand function to expand these nested events into individual records. However, before you can do that, you must extract the logEvents
array to a top-level field and cast it to a list of records so that the MV Expand function can accept logEvents
as input.
- Extract data from the nested fields in the
value
map into top-level fields namedowner
,logGroup
,source
,host
, andmessageType
.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the Where function, then select Eval from the function picker.
- On the View Configurations tab, enter the following expression in the Function field of the newly added Eval function:
owner = ucast(map_get(value, "owner"), "string", null), logGroup = ucast(map_get(value, "logGroup"), "string", null)
These expressions extract the values of the
owner
andlogGroup
keys that are nested in thevalue
field and cast them to string type. This transformation is necessary because the next step involves using theconcat()
function, which only accepts string values, to consolidate several values including theowner
andlogGroup
values into a single top-level field. - Click + Add. In the newly added Function field, enter the following expression:
source = concat(owner, "_", region, "_", logGroup), host = ucast(map_get(value, "logStream"), "string", null), messageType = ucast(map_get(value, "messageType"), "string", null)
These expressions extract metadata fields that are nested within the
value
field and assign them to newly created top-level fields namedhost
andmessageType
. Thesource
field is populated with a combination of data from the top-level fields namedowner
,region
, andlogGroup
. Consolidating data from multiple fields into a single field lets you drop the original fields from your records to free up memory usage while still preserving the data from those fields. - (Optional) Click the Start Preview icon (), then click the newly added Eval function to confirm that new top-level fields have been created.
- Create a top-level field named
source_type
and assign a meaningful value to it.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the last Eval function, then select Eval from the function picker.
- On the View Configurations tab, enter the following expression in the Function field, where <NameOfYourSourceType> is the value that you want to assign to the
source_type
field:source_type = "<NameOfYourSourcetype>"
If you want to ingest your data into the Amazon Web Service (AWS) app in the Splunk platform, change the value of the
source_type
field to match the target AWS app. For example:source_type = "aws:cloudwatchlogs:vpcflow"
- (Optional) Click the Start Preview icon (), then click the newly added Eval function to confirm that the value of
source_type
has been changed.
- Extract the
logEvents
array to a top-level field and cast it to a list of records.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the last Eval function, then select Eval from the function picker.
- On the View Configurations tab, enter the following expression in the Function field for the newly added Eval function:
logEvents = ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null)
- (Optional) Click the Start Preview icon (), then click the newly added Eval function to confirm that your records now include
logEvents
as a top-level field.
Expand this section to see an example of what one of your records might look like in the List view.
{ owner: "984646522837", logGroup: "IFacetoLogGroup", source: "984646522837_us-west-2_IFacetoLogGroup", host: "eni-0a8d610ca281edcb7-all", messageType: "DATA_MESSAGE", source_type: "aws:cloudwatchlogs:vpcflow", logEvents: [ { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464193", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464194", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464195", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 } ], value: { owner: "984646522837", subscriptionFilters: [ "Destination" ], logEvents: [ { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464193", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464194", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464195", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 } ], messageType: "DATA_MESSAGE", logGroup: "IFacetoLogGroup", logStream: "eni-0a8d610ca281edcb7-all" }, key: "c60e2717694a08f5d2e5e6889196506a", stream: "VPCtoKinesisStream", shard: "shardId-000000000003", sequence: "49607591761037110041819595034591373398647257439043321906", approxArrivalTimestamp: 1592931641083, accountId: "984646522837", region: "us-west-2" }
In addition to the fields that are part of the Amazon Kinesis Data Stream data schema by default, your records now contain the following top-level fields: owner
, logGroup
, source
, host
, messageType
, source_type
, and logEvents
. For details about the Amazon Kinesis Data Stream data schema, see the Function output schema of the Amazon Kinesis Data Stream source function in the Function Reference.
Step 3: (Optional) Drop unnecessary fields from your records
To reduce memory usage during processing and data preview, exclude unnecessary fields from your records. Now that you've extracted the relevant information from the value
, region
, owner
, and logGroup
fields, drop them from your records.
Although this is an optional step, it is strongly recommended for more efficient preview and processing of your records.
- On the pipeline canvas, click the Connect a processing or a sink function icon () after the last Eval function, then select Fields from the function picker.
- On the View Configurations tab, in the Field list field, enter value.
- Click + Add and then enter region in the new field that appears.
- Repeat the previous step as needed to add two more fields, and enter owner and logGroup in these fields.
- In the Operator field, enter a minus sign ( - ).
- (Optional) Click the Start Preview icon (), then click the newly added Fields function to confirm that the
value
,region
,owner
, andlogGroup
fields are dropped from your records.
Next, expand the grouped events in the logEvents
field into separate records, and finish extracting pertinent data into top-level fields.
Step 4: Expand and process data from the logEvents field
Currently, the logEvents
field of each record contains multiple events. Expand these nested events into individual records so that you can process them on an individual basis. Then, extract the pertinent data from the logEvents
field into top-level fields that can be mapped to standard fields in the Splunk HEC event schema. Extracting your data this way ensures that the relevant data is indexed properly when you send it to a Splunk index.
Additionally, filter out any VPC flow logs that don't contain meaningful message text. For example, some logs contain "NODATA" as the message.
- Expand each record into multiple records based on the list of values in the
logEvents
field.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the Fields function, then select MV Expand from the function picker.
- On the View Configurations tab, in the Field field, enter logEvents. This setting configures the MV Expand function to create a separate record for each value in the
logEvents
field and carry all other fields into each newly created record. - (Optional) Click the Start Preview icon (), then click the MV Expand function to confirm that the
logEvents
field no longer contains multiple events.
Expand this section to see an example of what one of your expanded records might look like in the List view.
{ source: "984646522837_us-west-2_IFacetoLogGroup", host: "eni-0a8d610ca281edcb7-all", messageType: "DATA_MESSAGE", source_type = "aws:cloudwatchlogs:vpcflow", logEvents: { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, key: "c60e2717694a08f5d2e5e6889196506a", stream: "VPCtoKinesisStream", shard: "shardId-000000000003", sequence: "49607591761037110041819595034591373398647257439043321906", approxArrivalTimestamp: 1592931641083, accountId: "984646522837" }
- Extract the message text and timestamp from the contents of the
logEvents
field.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the MV Expand function, then select Eval from the function picker.
- On the View Configurations tab, enter the following expressions in the Function field of the newly added Eval function:
body = ucast(map_get(logEvents, "message"), "string", null), timestamp = ucast(map_get(logEvents, "timestamp"), "long", null)
The first expression extracts the value of the
The second expression assigns the actual value of themessage
key from thelogEvents
field, assigns it to the newly createdbody
field, and then casts it to the string data type. This transformation enables the value ofmessage
from the VPC flow log to become the value of thebody
field in the Splunk HEC event.
timestamp
key to the top-level field namedtimestamp
. This data extraction ensures that thetimestamp
field contains the time when the event occurred rather than the time when the event was ingested by DSP. - (Optional) Click the Start Preview icon (), then click on the newly added Eval function to confirm that the
body
andtimestamp
fields are now included in the records.
- To avoid processing messages that don't contain meaningful information, filter out any records where the value of the
body
field isNODATA
.- On the pipeline canvas, click the Connect a processing or a sink function icon () after the last Eval function, then select Where from the function picker.
- On the View Configurations tab, enter the following expression in the Predicate field for the newly added Where function:
match_regex(body, /NODATA/) = false
- (Optional) Click the Start Preview icon (), then click the newly added Where function to confirm that
NODATA
messages have been filtered out.
Expand this section to see an example of what one of your finalized records might look like in the List view.
{ body: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000, source: "984646522837_us-west-2_IFacetoLogGroup", host: "eni-0a8d610ca281edcb7-all", messageType: "DATA_MESSAGE", source_type = "aws:cloudwatchlogs:vpcflow", logEvents: { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, key: "c60e2717694a08f5d2e5e6889196506a", stream: "VPCtoKinesisStream", shard: "shardId-000000000003", sequence: "49607591761037110041819595034591373398647257439043321906", approxArrivalTimestamp: 1592931641083, accountId: "984646522837" }
You now have properly formatted and and optimized data that is ready to be sent to a Splunk index.
Step 5: Send your transformed data to a Splunk index
Configure the Send to Splunk HTTP Event Collector sink function to send the transformed data from your DSP pipeline to a Splunk index.
- Add the Send to Splunk HTTP Event Collector sink function to the end of your pipeline and configure it.
- On the pipeline canvas, click the Connect a processing or a sink function icon () after the last Where function, then select Send to Splunk HTTP Event Collector from the function picker.
- On the View Configurations tab, set the Connection id field to the ID of your Splunk platform connection.
- In the Index field, enter the name of the Splunk index that you want to send your data to. Make sure to enclose the index name in double quotation marks ( " ).
- In the Default index field, enter the name of a Splunk index that you want to send your data to if the target index specified during the previous step cannot be used for any reason.
As an alternative to specifying literal index names in the Index and Default index fields, you can specify expressions that dynamically resolve to the index names. For more information, see Send data to Splunk HTTP Event Collector in the Function Reference.
- To save your pipeline, click Save.
- To start sending your data, click Activate. If you're activating your pipeline for the first time, don't select Skip Restore State or Allow Non-Restored State. See About DSP checkpoints and savepoints in the Use the Data Stream Processor for more details.
Depending on the size and complexity of the pipeline, it might take some time for the pipeline to finish activating. Before continuing to the next step, wait for the status beside the pipeline name to update to "Activated". Additionally, make sure that all the functions in the pipeline display metrics indicating that data is flowing through the pipeline. - To confirm that DSP is sending your transformed data to your Splunk index, open the Search & Reporting app in Splunk Web and search for your data. Use the following search criteria, where <SpecifiedIndex> is the index name that you specified in the Send to Splunk HTTP Event Collector sink function and <SpecifiedSourcetype> is the value that you assigned to the
source_type
field in your records:index=<SpecifiedIndex> | sourcetype=<SpecifiedSourcetype>
You've successfully transformed your VPC flow logs and sent them to a Splunk index through the .
See also
Create a DSP connection to Amazon Kinesis Data Streams | Connecting Amazon S3 to your DSP pipeline as a data source |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6
Feedback submitted, thanks!