Splunk® Data Stream Processor

Connect to Data Sources and Destinations with DSP

Acrobat logo Download manual as PDF


Acrobat logo Download topic as PDF

Deserialize data from Amazon Kinesis Data Streams in DSP

When you create a data pipeline in the to ingest data from Amazon Kinesis Data Streams using the Amazon Kinesis Data Stream source function, the ingested data is Base64-encoded. Deserialize the data into strings so that it can be used as input for other pipeline functions and be viewed as human-readable strings during data preview.

In DSP, Kinesis data is handled as follows:

  • Kinesis sends your data in Base64 encoding. The kinesis source function decodes the Base64 encoding. You don't need to use the base64_decode function in your pipeline.
  • The data you send appears in the value field of your record, and not the body field.
  • In the Preview Results tab, your data appears to be Base64 encoded.
  • Depending on your data source, you might need to deserialize your data from an encoding scheme other than Base64 to preview it in the Preview Results tab. You must include how you want to deserialize your data in your pipeline definition. For example, if you send data in JSON format, then use the deserialize_json_object function.
  • If you send text and want to view it in the Preview Results tab, add an Eval function to your pipeline after your source function. Enter the following SPL2 expression in the configuration panel of your Eval function: body=deserialize_json_object(value)

Example: Send data from Amazon Kinesis Data Streams to Splunk Enterprise using DSP

In order to properly send data from Amazon Kinesis Data Streams into Splunk Enterprise, you need to do some transformations on your data using DSP. This example shows you how to do the following tasks:

  • Deserialize Amazon Kinesis records in Base64 encoding to JSON format.
  • Create top-level fields for your records to make them compatible with Splunk Enterprise HTTP Event Collector (HEC).
  • Cast data to appropriate types and extract nested data where necessary.
  • Send your transformed data to your Splunk Enterprise instance.

This example use case works with VPC (Virtual Private Cloud) flow logs specifically, but other Amazon Kinesis Data Streams such as CloudWatch logs can be processed using the same approach.

Prerequisites

To process VPC flow logs and send them to Splunk Enterprise, you need to have the following:

Best practices

Follow these guidelines to get a better experience building your VPC flow logs pipeline in DSP:

  • Rename your functions to keep track of their roles as you might be using multiple instances of the same function in your pipeline.
  • Include Fields functions in your pipeline to trim some fields you don't need in your data so that they aren't indexed into Splunk Enterprise. Dropping unnecessary fields also reduces memory usage during preview and processing.
  • Extract metadata about your stream from your records and assign them to the same field to carry forward metadata without having to keep too many separate fields.
  • When previewing the data, switch to the List view for better layout of the JSON format:
  1. In the Preview Results tab, on the navigation bar, click Table.
  2. Select List from the drop-down menu.

Steps

  1. From the home page, click Build Pipeline, then select Amazon Kinesis Data Stream as your data source. Configure the source function with your Amazon Kinesis connection. See Get data from Amazon Kinesis Data Stream.
    Click Start Preview. Your data looks something like this in the List view:
    {   
        key: "15bf2d724aef70bd8f7e1b0a1037c7ad",
        value: "H4sIAAAAAAAAAL2X207bQBCGX2Xla7B2Dju7w10EAfWkooa7ClUudZGlkKDEtKoQ797xpgGakqpSs7mJ4l1n7S9z+P+5r27a5bK5bi9+3LbVUXUyuhh9ejeeTEZn4+qgmn+ftQtb1sTCEhATRVuezq/PFvO7W9t5ddpctf38ZHL+dr2Y9yf9om1u7IZ21h36huVKI5O0MdFX7w+b6dTuW959Xl4tutu+m89Ou2nfLpbV0cfqpF323awZVqvLfNr4Wzvrh737qvtih1IISEGIMQAzUULxXkWAMGD0RCyijFFIVAnRLgIRoD2z74y4b27s5SEoKoFE+7E/WP8Tdjy658DuRQQHEWuCWnwNMa6vAtVBHSuxd8kndeIAHbOCe3ra41dFNzo+Hp9fuPdvqoeD/4OjAnABa2DdQM1cwUdORofswCNxcTwugZej9QKdBRAzHbrEmorDhXKJSbFmrZHsymJlQEFVcl6SQx0ytzCblC+6VZZaSgo/Vh05DozF6eK+q84QDQWcRCyfl2nPRWddMzdMibF8R9F9hw5kaCkWNQDU0njoS+BJDR428FTV2oxIyJkpkKxnQigueAh7bCz0JOf2QeCL0+3Bq6yCuYpcDqJYz/QWSzRBDxCKM5awLNsiaMX3K4JWewmhuDRgCceyjc682JrOpIF9+fwsYVlekoYAkR9DB8lMi1nd8t1zD64lw660ITMOymdZae9Rvnfu2LYYW6yjr/VP9wmmB0F9FnbH2/Lyw/j1+HgnZEUsy99UnQY09HYcFTfTWMKzbJlgMT2XPEzFa4526liCUYEpnM0+uEHKHIcS42FqNW631YztLCtpp17l95Fuo13m6c6Q8wgEKf5DL7l8+AlBltTKBxIAAA==",
        stream: "VPCtoKinesisStream",
        shard: "shardId-000000000002",
        sequence: "49606703114497523631175608026491897379670462900935327778",
        approxArrivalTimestamp: 1592931758265,
        accountId: "984646522837",
        region: "us-west-2"
    }
    
  2. Your data from Amazon Kinesis comes in as Base64 encoding, so you need to reformat the value field into readable JSON format.
    1. On the pipeline canvas, click the + icon next to the Amazon Kinesis Data Stream function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the function field:
      value = deserialize_json_object(gunzip(value))
      
    3. Click Start Preview, then click the Eval function to confirm that data in the value field has been reformatted into JSON format. Here's an example of what your data might look like in the List view:
      {  
          value: { 
              owner: "984646522837",
              subscriptionFilters: [ 
                  "Destination"
              ],
              logEvents: [ 
                  {  
                      id: "35523561641084722377646761787974676562750764971027464192",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464193",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  {
                      id: "35523561641084722377646761787974676562750764971027464194",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464195",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  }
              ],
              messageType: "DATA_MESSAGE",
              logGroup: "IFacetoLogGroup",
              logStream: "eni-0a8d610ca281edcb7-all"
          },
          key: "c60e2717694a08f5d2e5e6889196506a",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000003",
          sequence: "49607591761037110041819595034591373398647257439043321906",
          approxArrivalTimestamp: 1592931641083,
          accountId: "984646522837",
          region: "us-west-2"
      }
      
  3. Since Amazon Kinesis is a transport mechanism, there might be data other than VPC flow logs in the stream. Before doing further transformations, filter VPC flow logs from your data stream.
    1. On the pipeline canvas, click the + icon after the Eval function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the predicate field and replace the placeholder with your own values. You can separate multiple VPC logGroup names using commas:
      map_get(value, "logGroup") IN ("NameOfYourVPClogGroup1", "NameOfYourVPClogGroup2")

      This expression keeps only the records whose logGroup fields match your specified values.

    3. Click Start Preview, then click the Where function to confirm that the data has been filtered properly.
  4. Records sent in by Amazon Kinesis lack some fields that are compatible with Splunk Enterprise HEC, so you need to create top-level fields for your VPC flow logs that correspond to the schema of Splunk Enterprise HEC event.
    1. On the pipeline canvas, click the + icon after the Where function, then select Eval from the function picker. You might want to rename your Eval functions to keep track of their roles in the pipeline.
    2. On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
      source = concat(ucast(map_get(value, "owner"), "string", null), "_", region, "_", ucast(map_get(value, "logGroup"), "string", null)), //required
      host = map_get(value, "logStream"),          //required
      messageType = map_get(value, "messageType") //optional
      

      These expressions extract the nested metadata fields such as owner, logGroup, logStream, and messageType fields within the value field and assign them to newly created top-level fields. The host and source top-level fields are strictly required schemas, but you can also create optional top-level fields to be indexed into Splunk Enterprise like messageType to help you keep track of other details about your VPC flow logs.

      The source field is a composition of data from the owner, region, and the original source filename. Assigning data from multiple fields to one single field preserves some metadata and allows you to drop some fields from your records to free up memory usage.

    3. Click Start Preview, then click the newly added Eval function to confirm that new top-level fields have been created.
  5. Another schema required by the Splunk Enterprise HEC event format is source_type. Create a top-level source_type field and set a meaningful value for it.
    1. On the pipeline canvas, click the + icon after the last Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the function field and replace the placeholder with your own specified value:
      source_type = "NameOfYourSourcetype"

      If you want to ingest your data into the Amazon Web Service (AWS) app in Splunk Enterprise, change the value of the source_type field to match the target AWS app: source_type = "aws:cloudwatchlogs:vpcflow"

    3. Click Start Preview, then click the newly added Eval function to confirm that the value of source_type has been changed.
  6. When you preview your data, you can see that each record contains multiple events inside the logEvents field. Before you can flatten these nested events into individual records, you need to promote the logEvents array to a top-level field and cast it to a list of records so the MV Expand function in the next step can admit logEvents as an input.
    1. On the pipeline canvas, click the + icon after the last Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
      logEvents = ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null)
      
  7. (Optional) To reduce memory usage during processing and during preview, drop as many fields as you can from the records.

    Although this is an optional step, it is strongly recommended for more efficient preview and processing of your records.

    1. On the pipeline canvas, click the + icon after the last Eval function, then select Fields from the function picker.
    2. On the View Configurations tab, enter the name of the top-level field you want to drop from the records in the field_list field. For example, now that the metadata from the value field have been extracted, enter value to drop this field.
    3. On the View Configurations tab, click + Add and enter another top-level field in the newly added field_list field. For example, enter region to drop this field since the data from region has been extracted and assigned to source. You can drop as many fields as you want from your records, but each field has to be in a separate field_list field.
    4. On the View Configurations tab, enter - in the operator (optional) field.
    5. On the Output Fields tab, click Update to see the list of top-level fields after dropping.
    6. Click Start Preview, then click the newly added Fields function to confirm that the unwanted fields are dropped from your records.
  8. Flatten a record containing a logEvents list into multiple records.
    1. On the pipeline canvas, click the + icon after the Fields function, then select MV Expand from the function picker.
    2. On the View Configurations tab, enter logEvents into the field field and enter 0 for the limit field to flatten all values within logEvents into separate records. The mvexpand function flattens all the values within the named field and carries all other fields into each newly created record.
    3. Click Start Preview, then click the MV Expand function to confirm that the logEvents lists have been expanded. Here's an example of what each record now looks like in the List view:
      { 
          logEvents: {
              id: "35525397862143624190625712278351248861288097035836653577",
              message: "2 984646522837 eni-0ea3ad9fbb7a5cbe4 172.31.52.149 172.31.56.101 9887 34908 6 110 8852 1593013935 1593013945 ACCEPT OK",
              timestamp: 1593013935000
          },
          owner: "984646522837",
          approxArrivalTimestamp: 1593014012436,
          accountId: "984646522837",
          messageType: "DATA_MESSAGE",
          stream: "VPCtoKinesisStream",
          host: "eni-0ea3ad9fbb7a5cbe4-all",
          source_type: "aws:cloudwatchlogs:vpcflow",
          source: "IFacetoLogGroup",
          region: "us-west-2"
      }
      
  9. Create a body field for the records and extract a timestamp from the records to properly index them into Splunk Enterprise.
    1. On the pipeline canvas, click the + icon after the MV Expand function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the function field for the newly added Eval function:
      body = ucast(map_get(logEvents, "message"), "string", null),
      timestamp = map_get(logEvents, "timestamp")
      The first expression extracts the value of the message key from the record, assigns it to the newly created body field and casts it to a string so that the value of message from the VPC flow log becomes the value of body in Splunk Enterprise HEC event. The second expression assigns the actual value of the timestamp key to the top-level timestamp field to prevent DSP from using the time of record ingestion.
    3. Click Start Preview, then click on the newly added Eval function to confirm that the body and timestamp top-level fields are now parts of the schemas.
  10. Filter out NODATA messages to avoid processing messages with no status.
    1. On the pipeline canvas, click the + icon after the last Eval function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the predicate field for the newly added Where function:
      match_regex(body, /NODATA/) = false
      
    3. Click Start Preview, then click the newly added Where function to confirm that NODATA messages have been filtered out.
  11. Send your transformed data to Splunk Enterprise.
    1. On the pipeline canvas, click the + icon after the last Where function, then select Send to a Splunk Index with Batching and configure the function with your Splunk Enterprise HEC. See Send data to a Splunk Index with Batching.
    2. Click Activate Pipeline > Activate to start sending your data. If you're activating your pipeline for the first time, don't check Skip Restore State or Allow Non-Restored State. See Using activation checkpoints to activate your pipeline for more details.
    3. To confirm that DSP is sending your transformed data to your Splunk Enterprise instance, open the Search & Reporting app in your Splunk Enterprise instance and search for your data. Use the following search criteria:

      index=<selected index> | sourcetype=<name of your sourcetype>

You've successfully transformed your VPC flow logs and sent them to Splunk Enterprise through DSP using the Canvas Builder. Alternatively, if you want to build your pipeline using the SPL2 Builder, here's what the full SPL2 expressions might look like in the end. Replace the placeholders with your own configurations in the source and sink functions as well as your own user-specified values in some streaming functions:

| from kinesis("YourKinesisConnectionID", "NameOfYourKinesisStream") | eval value=deserialize_json_object(gunzip(value)) | where map_get(value, "logGroup") IN ("NameOfYourVPClogGroup1", "NameOfYourVPClogGroup2") | eval source = concat(ucast(map_get(value, "owner"), "string", null), "_", region, "_", ucast(map_get(value, "logGroup"), "string", null)), host = map_get(value, "logStream"), messageType = map_get(value, "messageType") | eval source_type="NameOfYourSourcetype" | eval logEvents=ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null) | fields - value region | mvexpand limit=0 logEvents | eval body=ucast(map_get(logEvents, "message"), "string", null), timestamp=map_get(logEvents, "timestamp") | where match_regex(body, /NODATA/) = false | into splunk_enterprise_indexes("YourHECConnectionID", "SelectedIndex", "SelectedDefaultIndex");

See also

Functions
Casting
Evals
Fields
MV Expand
Related topics
Format event data in DSP for Splunk indexes
Overview of the dashboards in the Splunk App for AWS
Last modified on 10 December, 2020
PREVIOUS
Create a DSP connection to Amazon Kinesis Data Streams
  NEXT
Connecting Amazon S3 to your DSP pipeline as a data source

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1


Was this documentation topic helpful?

You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters