Deserialize and preview data from Amazon Kinesis
If you're creating a pipeline to ingest data from Kinesis using the Read from Amazon Kinesis source function, read the following to deserialize and preview your data:
- Kinesis sends your data in Base64 encoding. The
read_kinesis
source function decodes the Base64 encoding. You don't need to use the base64_decode function in your pipeline. - The data you send appears in the value field of your record, and not the body field.
- In the Preview Results tab, your data appears to be Base64 encoded.
- Depending on your data source, you might need to deserialize your data from an encoding scheme other than Base64 to preview it in the Preview Results tab. You must include how you want to deserialize your data in your pipeline definition. For example, if you send data in JSON format, then use the
deserialize_json_object
function. - If you send text and want to view it in the Preview Results tab, add an
Eval
function to your pipeline after your source function. Enter the following SPL2 expression in the configuration panel of yourEval
function:body=deserialize_json_object(value)
Example: Send data from Amazon Kinesis Data Streams to Splunk Enterprise using DSP
In order to properly send data from Amazon Kinesis Data Streams into Splunk Enterprise, you need to do some transformations on your data using DSP. This example shows you how to do the following tasks:
- Deserialize Amazon Kinesis records in Base64 encoding to JSON format.
- Create top-level fields for your records to make them compatible with Splunk Enterprise HTTP Event Collector (HEC).
- Cast data to appropriate types and extract nested data where necessary.
- Send your transformed data to your Splunk Enterprise instance.
This example use case works with VPC (Virtual Private Cloud) flow logs specifically, but other Amazon Kinesis Data Streams such as CloudWatch logs can be processed using the same approach.
Prerequisites
To process VPC flow logs and send them to Splunk Enterprise, you need to have the following:
- A connection for the DSP Amazon Kinesis Connector with static credentials. See Create a connection for the DSP Amazon Kinesis Connector with static credentials.
- Access to a Splunk Enterprise instance that has the HEC enabled. You also need to know the HEC endpoint URL and HEC token associated with the instance. See Set up and use HTTP Event Collector in Splunk Web.
Best practices
Follow these guidelines to get a better experience building your VPC flow logs pipeline in DSP:
- Rename your functions to keep track of their roles as you might be using multiple instances of the same function in your pipeline. If you choose to rename your functions, stay in the Canvas Builder and avoid switching back and forth between the Canvas Builder and the SPL2 (Search Processing Language 2) Builder since the SPL2 Builder is a beta feature that might revert function name changes.
- Include
Fields
functions in your pipeline to trim some fields you don't need in your data so that they aren't indexed into Splunk Enterprise. Dropping unnecessary fields also reduces memory usage during preview and processing. - Extract metadata about your stream from your records and assign them to the same field to carry forward metadata without having to keep too many separate fields.
- When previewing the data, switch to the List view for better layout of the JSON format:
- In the Preview Results tab, on the navigation bar, click Table.
- Select List from the drop-down menu.
Steps
- In DSP, click Build Pipeline, then select the Read from Amazon Kinesis Stream as your source function. Configure the function with your Amazon Kinesis connection. See Amazon Kinesis Connector and Read from Amazon Kinesis Stream.
Click Start Preview. Your data looks something like this in the List view:{ key: "15bf2d724aef70bd8f7e1b0a1037c7ad", value: "H4sIAAAAAAAAAL2X207bQBCGX2Xla7B2Dju7w10EAfWkooa7ClUudZGlkKDEtKoQ797xpgGakqpSs7mJ4l1n7S9z+P+5r27a5bK5bi9+3LbVUXUyuhh9ejeeTEZn4+qgmn+ftQtb1sTCEhATRVuezq/PFvO7W9t5ddpctf38ZHL+dr2Y9yf9om1u7IZ21h36huVKI5O0MdFX7w+b6dTuW959Xl4tutu+m89Ou2nfLpbV0cfqpF323awZVqvLfNr4Wzvrh737qvtih1IISEGIMQAzUULxXkWAMGD0RCyijFFIVAnRLgIRoD2z74y4b27s5SEoKoFE+7E/WP8Tdjy658DuRQQHEWuCWnwNMa6vAtVBHSuxd8kndeIAHbOCe3ra41dFNzo+Hp9fuPdvqoeD/4OjAnABa2DdQM1cwUdORofswCNxcTwugZej9QKdBRAzHbrEmorDhXKJSbFmrZHsymJlQEFVcl6SQx0ytzCblC+6VZZaSgo/Vh05DozF6eK+q84QDQWcRCyfl2nPRWddMzdMibF8R9F9hw5kaCkWNQDU0njoS+BJDR428FTV2oxIyJkpkKxnQigueAh7bCz0JOf2QeCL0+3Bq6yCuYpcDqJYz/QWSzRBDxCKM5awLNsiaMX3K4JWewmhuDRgCceyjc682JrOpIF9+fwsYVlekoYAkR9DB8lMi1nd8t1zD64lw660ITMOymdZae9Rvnfu2LYYW6yjr/VP9wmmB0F9FnbH2/Lyw/j1+HgnZEUsy99UnQY09HYcFTfTWMKzbJlgMT2XPEzFa4526liCUYEpnM0+uEHKHIcS42FqNW631YztLCtpp17l95Fuo13m6c6Q8wgEKf5DL7l8+AlBltTKBxIAAA==", stream: "VPCtoDSPKinesisStream", shard: "shardId-000000000002", sequence: "49606703114497523631175608026491897379670462900935327778", approxArrivalTimestamp: 1592931758265, accountId: "984646522837", region: "us-west-2" }
- Your data from Amazon Kinesis comes in as Base64 encoding, so you need to reformat the value field into readable JSON format.
- On the pipeline canvas, click the + icon next to the Read from Amazon Kinesis Stream function, then select Eval from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the function field:
value = deserialize_json_object(gunzip(value))
- Click Start Preview, then click the Eval function to confirm that data in the
value
field has been reformatted into JSON format. Here's an example of what your data might look like in the List view:{ value: { owner: "984646522837", subscriptionFilters: [ "Destination" ], logEvents: [ { id: "35523561641084722377646761787974676562750764971027464192", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464193", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464194", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 }, { id: "35523561641084722377646761787974676562750764971027464195", message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK", timestamp: 1592931596000 } ], messageType: "DATA_MESSAGE", logGroup: "IFacetoDSPLogGroup", logStream: "eni-0a8d610ca281edcb7-all" }, key: "c60e2717694a08f5d2e5e6889196506a", stream: "VPCtoDSPKinesisStream", shard: "shardId-000000000003", sequence: "49607591761037110041819595034591373398647257439043321906", approxArrivalTimestamp: 1592931641083, accountId: "984646522837", region: "us-west-2" }
- Since Amazon Kinesis is a transport mechanism, there might be data other than VPC flow logs in the stream. Before doing further transformations, filter VPC flow logs from your data stream.
- On the pipeline canvas, click the + icon after the Eval function, then select Where from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the predicate field and replace the placeholder with your own values. You can separate multiple VPC
logGroup
names using commas:map_get(value, "logGroup") IN ("NameOfYourVPClogGroup1", "NameOfYourVPClogGroup2")
This expression keeps only the records whose
logGroup
fields match your specified values. - Click Start Preview, then click the Where function to confirm that the data has been filtered properly.
- Records sent in by Amazon Kinesis lack some fields that are compatible with Splunk Enterprise HEC, so you need to create top-level fields for your VPC flow logs that correspond to the schema of Splunk Enterprise HEC event.
- On the pipeline canvas, click the + icon after the Where function, then select Eval from the function picker. You might want to rename your Eval functions to keep track of their roles in the pipeline.
- On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
source = concat(ucast(map_get(value, "owner"), "string", null), "_", region, "_", ucast(map_get(value, "logGroup"), "string", null)), //required host = map_get(value, "logStream"), //required messageType = map_get(value, "messageType") //optional
These expressions extract the nested metadata fields such as
owner
,logGroup
,logStream
, andmessageType
fields within thevalue
field and assign them to newly created top-level fields. Thehost
andsource
top-level fields are strictly required schemas, but you can also create optional top-level fields to be indexed into Splunk Enterprise likemessageType
to help you keep track of other details about your VPC flow logs.The
source
field is a composition of data from theowner
,region
, and the original source filename. Assigning data from multiple fields to one single field preserves some metadata and allows you to drop some fields from your records to free up memory usage. - Click Start Preview, then click the newly added Eval function to confirm that new top-level fields have been created.
- Another schema required by the Splunk Enterprise HEC event format is
source_type
. Create a top-levelsource_type
field and set a meaningful value for it.- On the pipeline canvas, click the + icon after the last Eval function, then select Eval from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the function field and replace the placeholder with your own specified value:
source_type = "NameOfYourSourcetype"
If you want to ingest your data into the Amazon Web Service (AWS) app in Splunk Enterprise, change the value of the
source_type
field to match the target AWS app:source_type = "aws:cloudwatchlogs:vpcflow"
- Click Start Preview, then click the newly added Eval function to confirm that the value of
source_type
has been changed.
- When you preview your data, you can see that each record contains multiple events inside the
logEvents
field. Before you can flatten these nested events into individual records, you need to promote thelogEvents
array to a top-level field and cast it to a list of records so the MV Expand function in the next step can admitlogEvents
as an input.- On the pipeline canvas, click the + icon after the last Eval function, then select Eval from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
logEvents = ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null)
- (Optional) To reduce memory usage during processing and during preview, drop as many fields as you can from the records.
Although this is an optional step, it is strongly recommended for more efficient preview and processing of your records.
- On the pipeline canvas, click the + icon after the last Eval function, then select Fields from the function picker.
- On the View Configurations tab, enter the name of the top-level field you want to drop from the records in the field_list field. For example, now that the metadata from the
value
field have been extracted, enter value to drop this field. - On the View Configurations tab, click + Add and enter another top-level field in the newly added field_list field. For example, enter region to drop this field since the data from
region
has been extracted and assigned tosource
. You can drop as many fields as you want from your records, but each field has to be in a separate field_list field. - On the View Configurations tab, enter - in the operator (optional) field.
- On the Output Fields tab, click Update to see the list of top-level fields after dropping.
- Click Start Preview, then click the newly added Fields function to confirm that the unwanted fields are dropped from your records.
- Flatten a record containing a
logEvents
list into multiple records.- On the pipeline canvas, click the + icon after the Fields function, then select MV Expand from the function picker.
- On the View Configurations tab, enter logEvents into the field field and enter 0 for the limit field to flatten all values within
logEvents
into separate records. Themvexpand
function flattens all the values within the named field and carries all other fields into each newly created record. - Click Start Preview, then click the MV Expand function to confirm that the
logEvents
lists have been expanded. Here's an example of what each record now looks like in the List view:{ logEvents: { id: "35525397862143624190625712278351248861288097035836653577", message: "2 984646522837 eni-0ea3ad9fbb7a5cbe4 172.31.52.149 172.31.56.101 9887 34908 6 110 8852 1593013935 1593013945 ACCEPT OK", timestamp: 1593013935000 }, owner: "984646522837", approxArrivalTimestamp: 1593014012436, accountId: "984646522837", messageType: "DATA_MESSAGE", stream: "VPCtoDSPKinesisStream", host: "eni-0ea3ad9fbb7a5cbe4-all", source_type: "aws:cloudwatchlogs:vpcflow", source: "IFacetoDSPLogGroup", region: "us-west-2" }
- Create a
body
field for the records and extract a timestamp from the records to properly index them into Splunk Enterprise.- On the pipeline canvas, click the + icon after the MV Expand function, then select Eval from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
body = ucast(map_get(logEvents, "message"), "string", null), timestamp = map_get(logEvents, "timestamp")
message
key from the record, assigns it to the newly createdbody
field and casts it to a string so that the value ofmessage
from the VPC flow log becomes the value ofbody
in Splunk Enterprise HEC event. The second expression assigns the actual value of thetimestamp
key to the top-level timestamp field to prevent DSP from using the time of record ingestion. - Click Start Preview, then click on the newly added Eval function to confirm that the
body
andtimestamp
top-level fields are now parts of the schemas.
- Filter out NODATA messages to avoid processing messages with no status.
- On the pipeline canvas, click the + icon after the last Eval function, then select Where from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the predicate field for the newly added Where function:
match_regex(body, /NODATA/) = false
- Click Start Preview, then click the newly added Where function to confirm that NODATA messages have been filtered out.
- Send your transformed data to Splunk Enterprise.
- On the pipeline canvas, click the + icon after the last Where function, then select Write to the Splunk platform with Batching and configure the function with your Splunk Enterprise HEC. See "Write to the Splunk platform with Batching" in the Sink functions (Data Destinations) topic.
- Click Activate Pipeline > Activate to start sending your data. If you're activating your pipeline for the first time, don't check Skip Restore State or Allow Non-Restored State. See Using activation checkpoints to activate your pipeline for more details.
- To confirm that DSP is sending your transformed data to your Splunk Enterprise instance, open the Search & Reporting app in your Splunk Enterprise instance and search for your data. Use the following search criteria:
index=<selected index> | sourcetype=<name of your sourcetype>
You've successfully transformed your VPC flow logs and sent them to Splunk Enterprise through DSP using the Canvas Builder. Alternatively, if you want to build your pipeline using the SPL2 Builder, here's what the full SPL2 expressions might look like in the end. Replace the placeholders with your own configurations in the source and sink functions as well as your own user-specified values in some streaming functions:
| from read_kinesis("YourKinesisConnectionID", "NameOfYourKinesisStream") | eval value=deserialize_json_object(gunzip(value)) | where map_get(value, "logGroup") IN ("NameOfYourVPClogGroup1", "NameOfYourVPClogGroup2") | eval source = concat(ucast(map_get(value, "owner"), "string", null), "_", region, "_", ucast(map_get(value, "logGroup"), "string", null)), host = map_get(value, "logStream"), messageType = map_get(value, "messageType") | eval source_type="NameOfYourSourcetype" | eval logEvents=ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null) | fields - value region | mvexpand limit=0 logEvents | eval body=ucast(map_get(logEvents, "message"), "string", null), timestamp=map_get(logEvents, "timestamp") | where match_regex(body, /NODATA/) = false | into splunk_enterprise_indexes("YourHECConnectionID", "SelectedIndex", "SelectedDefaultIndex");
See also
Create custom functions with the DSP SDK | Deserialize and send Azure Event Hubs data from a DSP pipeline |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.1.0
Feedback submitted, thanks!