Splunk® Data Stream Processor

Connect to Data Sources and Destinations with DSP

On April 3, 2023, Splunk Data Stream Processor reached its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.

All DSP releases prior to DSP 1.4.0 use Gravity, a Kubernetes orchestrator, which has been announced end-of-life. We have replaced Gravity with an alternative component in DSP 1.4.0. Therefore, we will no longer provide support for versions of DSP prior to DSP 1.4.0 after July 1, 2023. We advise all of our customers to upgrade to DSP 1.4.0 in order to continue to receive full product support from Splunk.

Formatting data from Amazon Kinesis Data Streams for indexing in the Splunk platform

When you use the Amazon Kinesis Data Stream source function to receive data, the output records use a schema that can't be indexed meaningfully in the Splunk platform. If you send Amazon Kinesis records to an index without formatting the records first, you'll notice problems such as the following:

  • The records are indexed as empty events that have associated metadata but no payload.
  • Some of the metadata fields contain values that pertain to your use of the DSP pipeline and the Splunk HTTP Event Collector (HEC) rather than the actual log or event from Amazon Kinesis Data Streams. For example, the timestamp in the indexed record indicates the time when the event was ingested into the rather than the time when the event was actually generated.
  • In some cases, the value field in the record contains data values that are not human-readable, such as Gzip-compressed data.

Before sending Amazon Kinesis data from a DSP data pipeline to a Splunk index, make sure to format your records to meet the following criteria:

  • The payload of the record is stored in a top-level field named body.
  • Any important pieces of metadata, such as the timestamp or source type associated with the record, are stored in the following top-level fields: timestamp, source_type, host, and source.
  • The data values in the records are human-readable.

For an example of how to build a custom pipeline that completes the necessary data processing, see the Example: Send data from Amazon Kinesis Data Streams to the Splunk platform using the section on this page.

Problems with indexing unprocessed data from Amazon Kinesis Data Streams

The Amazon Kinesis Data Stream source function outputs records that use the following schema:

{
  "key": string,
  "value": bytes,
  "stream": string,
  "shard": string,
  "sequence": string,
  "approxArrivalTimestamp": long,
  "accountId": string,
  "region": string
}

Amazon Kinesis records don't contain the body field, since their payloads are stored in the value field instead. As a result, the Splunk platform indexes these records as empty events.

Additionally, Amazon Kinesis records don't include the timestamp, host, source, and source_type fields, which are part of the standard DSP schemas. When you use the Send to Splunk HTTP Event Collector sink function to send data from a DSP pipeline to a Splunk index, the sink function maps these standard fields to analogous fields in the resulting indexed records. Since the Amazon Kinesis records don't contain values for these fields, these fields become populated with default metadata. For example, if you send an Amazon Kinesis record to a Splunk index without formatting the record first, the indexed record displays the following metadata:

  • The timestamp in the indexed record indicates the time when the event was ingested into DSP rather than the time when the event was actually generated.
  • The host field contains the Splunk HEC endpoint.
  • The source field contains the default source specified in your Splunk HEC token. If your Splunk HEC token doesn't specify a default source, then the source field contains the value http:<pipeline-name>, where <pipeline-name> is the name of your DSP pipeline.
  • The sourcetype field contains the default sourcetype specified in your Splunk HEC token. If your Splunk HEC token doesn't specify a default sourcetype, then the sourcetype field contains the value httpevent.

For more information about how the Send to Splunk HTTP Event Collector sink function maps records to the Splunk HEC event schema and determines default metadata values, see Format event data in DSP for Splunk indexes.

Depending on the specific type of data that you're ingesting from Amazon Kinesis Data Streams, the payload might be formatted in a way that is not human-readable. For instance, Amazon Virtual Private Cloud (VPC) flow logs are Gzip-compressed. In such cases, you'll need to convert the payload into a human-readable format such as plain text strings so that you can actually understand and work with the data.

The following example describes how to build a custom pipeline that formats records from Amazon Kinesis Data Streams so that the data is human-readable and mapped to the appropriate top-level fields when you send the data to the Splunk platform for indexing.

Example: Send data from Amazon Kinesis Data Streams to the Splunk platform using the

You can use DSP to process Amazon Virtual Private Cloud (VPC) flow logs from Amazon Kinesis Data Streams and then send the data to a Splunk index. To ensure that the logs can be indexed meaningfully, format the log data so that relevant information is stored in the following fields:

  • body
  • source_type
  • source
  • host
  • timestamp

This example shows you how to do the following tasks:

  • Deserialize the value field in the Amazon Kinesis records so that it is converted from bytes to a map of key-value pairs. This conversion is a prerequisite for any data processing that you want to apply to the value field; converting the field from bytes to a more commonly supported data type makes it compatible with a wider range of streaming functions.
  • Extract relevant data from various fields into top-level host, source, source_type, and timestamp fields. This task also includes casting the data to the appropriate data types as necessary.
  • Expand a multivalue field into separate events so that you can process your data with greater precision.
  • Reduce noise in your data by filtering out irrelevant events and dropping any fields that you don't need to retain.
  • Send your transformed data to a Splunk index.

This example works with VPC flow logs specifically, but other data handled by Amazon Kinesis Data Streams such as Amazon CloudWatch logs can be processed using a similar approach.

The following instructions explain how to build the example pipeline from scratch. If you want to view the example pipeline in DSP without going through the steps to build it, you can import a copy of the pipeline using an SPL2 statement or a JSON file.

To import the pipeline using an SPL2 statement, do the following:

  1. In the Canvas View in DSP, click the SPL View toggle to switch to the SPL View.
  2. Copy the following SPL2 statement and paste it into the SPL2 Pipeline Builder.
    | from kinesis("YourKinesisConnectionID", "NameOfYourKinesisStream") | eval value=deserialize_json_object(gunzip(value)) | where map_get(value, "logGroup") IN ("NameOfYourVPCLogGroup1", "NameOfYourVPCLogGroup2") | eval owner=ucast(map_get(value, "owner"), "string", null), logGroup=ucast(map_get(value, "logGroup"), "string", null), source=concat(owner, "_", region, "_", logGroup),
    host=ucast(map_get(value, "logStream"), "string", null),
    messageType=ucast(map_get(value, "messageType"), "string", null) | eval source_type="NameOfYourSourcetype" | eval logEvents=ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null) | fields - value, region, owner, logGroup | mvexpand limit=0 logEvents | eval body=ucast(map_get(logEvents, "message"), "string", null),
    timestamp=ucast(map_get(logEvents, "timestamp"), "long", null) | where match_regex(body, /NODATA/)=false | into splunk_enterprise_indexes("YourHECConnectionID", "SelectedIndex", "SelectedDefaultIndex");
    
  3. Click Recreate Pipeline.

To import the pipeline using a JSON file, do the following:

  1. Download a copy of the pipeline by clicking this link: DSP_AmazonKinesisPipeline.zip
  2. Extract the JSON file from the downloaded ZIP archive to any location of your choice.
  3. In the Canvas View in DSP, click the pipeline options button DSP Ellipses button and then select Import pipeline.
  4. Select the DSP_AmazonKinesisPipeline.json file and then click Import.

Prerequisites

To process VPC flow logs from Amazon Kinesis Data Streams and send them to a Splunk index, you need to have the following:

Best practices

Follow these guidelines to get a better experience building your VPC flow logs pipeline in DSP:

  • Rename your functions to keep track of their roles, as you might be using multiple instances of the same function in your pipeline.
  • Use the Fields function in your pipeline to drop any fields that you don't need in your data, so that these fields aren't indexed into the Splunk platform. Dropping unnecessary fields also reduces memory usage during preview and processing.
  • Extract metadata from various fields and consolidate it into a single field, so that you can drop the original fields that the metadata was extracted from. Consolidating your metadata allows you to retain the metadata while trimming down on the number of fields in your records.
  • When previewing the data, switch to the List view to see a clearer layout of the JSON format:
  1. In the Preview Results tab, on the navigation bar, click Table.
  2. Select List from the drop-down menu.

Step 1: Receive VPC flow logs from Amazon Kinesis Data Streams and format the payload

When you ingest VPC flow logs through the Amazon Kinesis Data Stream source function, the payloads of these logs are stored as Gzip-compressed values in a bytes field named value. You'll need to decompress the data so that the values become human-readable, and deserialize the value field from bytes to a map of key-value pairs so that the field can be used as input in a wider range of streaming functions.

Because Amazon Kinesis Data Streams is a transport mechanism, the incoming stream might include other types of data in addition to VPC flow logs. Filter out any records that don't contain VPC flow logs.

  1. In DSP, on the Pipelines page, click Create Pipeline.
  2. Select the Amazon Kinesis Data Stream data source.
  3. Configure the Amazon Kinesis Data Stream source function to start receiving data.
    1. On the View Configurations tab, set the Connection id field to the ID of your Amazon Kinesis Data Streams connection.
    2. In the Stream name field, enter the name of the Amazon Kinesis data stream that you want to read data from.
    3. (Optional) Click the Start Preview icon (Start Preview button) and check the Preview Results pane to confirm that your data is arriving in DSP as expected.
    4. Expand this section to see an example of what one of your records might look like in the List view.

      {   
          key: "15bf2d724aef70bd8f7e1b0a1037c7ad",
          value: "H4sIAAAAAAAAAL2X207bQBCGX2Xla7B2Dju7w10EAfWkooa7ClUudZGlkKDEtKoQ797xpgGakqpSs7mJ4l1n7S9z+P+5r27a5bK5bi9+3LbVUXUyuhh9ejeeTEZn4+qgmn+ftQtb1sTCEhATRVuezq/PFvO7W9t5ddpctf38ZHL+dr2Y9yf9om1u7IZ21h36huVKI5O0MdFX7w+b6dTuW959Xl4tutu+m89Ou2nfLpbV0cfqpF323awZVqvLfNr4Wzvrh737qvtih1IISEGIMQAzUULxXkWAMGD0RCyijFFIVAnRLgIRoD2z74y4b27s5SEoKoFE+7E/WP8Tdjy658DuRQQHEWuCWnwNMa6vAtVBHSuxd8kndeIAHbOCe3ra41dFNzo+Hp9fuPdvqoeD/4OjAnABa2DdQM1cwUdORofswCNxcTwugZej9QKdBRAzHbrEmorDhXKJSbFmrZHsymJlQEFVcl6SQx0ytzCblC+6VZZaSgo/Vh05DozF6eK+q84QDQWcRCyfl2nPRWddMzdMibF8R9F9hw5kaCkWNQDU0njoS+BJDR428FTV2oxIyJkpkKxnQigueAh7bCz0JOf2QeCL0+3Bq6yCuYpcDqJYz/QWSzRBDxCKM5awLNsiaMX3K4JWewmhuDRgCceyjc682JrOpIF9+fwsYVlekoYAkR9DB8lMi1nd8t1zD64lw660ITMOymdZae9Rvnfu2LYYW6yjr/VP9wmmB0F9FnbH2/Lyw/j1+HgnZEUsy99UnQY09HYcFTfTWMKzbJlgMT2XPEzFa4526liCUYEpnM0+uEHKHIcS42FqNW631YztLCtpp17l95Fuo13m6c6Q8wgEKf5DL7l8+AlBltTKBxIAAA==",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000002",
          sequence: "49606703114497523631175608026491897379670462900935327778",
          approxArrivalTimestamp: 1592931758265,
          accountId: "984646522837",
          region: "us-west-2"
      }
      
  4. Decompress and deserialize the contents of the value field.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) next to the Amazon Kinesis Data Stream function, then select Eval from the function picker.
    2. On the View Configurations tab, in the Function field, enter the following expression:
      value = deserialize_json_object(gunzip(value))
      
    3. (Optional) Click the Start Preview icon (Start Preview button), then click the Eval function to confirm that data in the value field has been reformatted into JSON format.
    4. Expand this section to see an example of what one of your formatted records might look like in the List view.

      {  
          value: { 
              owner: "984646522837",
              subscriptionFilters: [ 
                  "Destination"
              ],
              logEvents: [ 
                  {  
                      id: "35523561641084722377646761787974676562750764971027464192",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464193",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  {
                      id: "35523561641084722377646761787974676562750764971027464194",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464195",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  }
              ],
              messageType: "DATA_MESSAGE",
              logGroup: "IFacetoLogGroup",
              logStream: "eni-0a8d610ca281edcb7-all"
          },
          key: "c60e2717694a08f5d2e5e6889196506a",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000003",
          sequence: "49607591761037110041819595034591373398647257439043321906",
          approxArrivalTimestamp: 1592931641083,
          accountId: "984646522837",
          region: "us-west-2"
      }
      
  5. Filter the incoming data so that only records that contain VPC flow logs are included.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the Eval function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following expression in the Predicate field, where <NameofYourVPCLogGroup> is the logGroup name used in your VPC flow logs. You can specify a comma-separated list of multiple VPC logGroup names.
      map_get(value, "logGroup") IN ("<NameOfYourVPCLogGroup1>", "<NameOfYourVPCLogGroup2>")

      This expression keeps only the records where the value of the logGroup field, which is nested in the value map, matches your specified VPC logGroup names.

    3. (Optional) Click the Start Preview icon (Start Preview button), then click the Where function to confirm that the data has been filtered properly.

The next few sections demonstrate how you can extract log data into top-level fields, drop unnecessary fields, and expand grouped events into separate records. Transforming your records this way ensures that they can be properly indexed in the Splunk platform.

Step 2: Extract log data into fields that can be mapped to the Splunk HEC event schema

To ensure that the relevant data can be indexed properly, extract the data from the value field into top-level fields that can be mapped to the Splunk HEC event schema. In particular, you must extract the logEvents array from the value field to a top-level field.

The logEvents array in each record contains multiple events. During a later step in this example, you'll use the MV Expand function to expand these nested events into individual records. However, before you can do that, you must extract the logEvents array to a top-level field and cast it to a list of records so that the MV Expand function can accept logEvents as input.

  1. Extract data from the nested fields in the value map into top-level fields named owner, logGroup, source, host, and messageType.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the Where function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following expression in the Function field of the newly added Eval function:
      owner = ucast(map_get(value, "owner"), "string", null),
      logGroup = ucast(map_get(value, "logGroup"), "string", null)
      

      These expressions extract the values of the owner and logGroup keys that are nested in the value field and cast them to string type. This transformation is necessary because the next step involves using the concat() function, which only accepts string values, to consolidate several values including the owner and logGroup values into a single top-level field.

    3. Click + Add. In the newly added Function field, enter the following expression:
      source = concat(owner, "_", region, "_", logGroup),
      host = ucast(map_get(value, "logStream"), "string", null),
      messageType = ucast(map_get(value, "messageType"), "string", null)
      

      These expressions extract metadata fields that are nested within the value field and assign them to newly created top-level fields named host and messageType. The source field is populated with a combination of data from the top-level fields named owner, region, and logGroup. Consolidating data from multiple fields into a single field lets you drop the original fields from your records to free up memory usage while still preserving the data from those fields.

    4. (Optional) Click the Start Preview icon (Start Preview button), then click the newly added Eval function to confirm that new top-level fields have been created.
  2. Create a top-level field named source_type and assign a meaningful value to it.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the last Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following expression in the Function field, where <NameOfYourSourceType> is the value that you want to assign to the source_type field:
      source_type = "<NameOfYourSourcetype>"

      If you want to ingest your data into the Amazon Web Service (AWS) app in the Splunk platform, change the value of the source_type field to match the target AWS app. For example: source_type = "aws:cloudwatchlogs:vpcflow"

    3. (Optional) Click the Start Preview icon (Start Preview button), then click the newly added Eval function to confirm that the value of source_type has been changed.
  3. Extract the logEvents array to a top-level field and cast it to a list of records.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the last Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following expression in the Function field for the newly added Eval function:
      logEvents = ucast(map_get(value, "logEvents"), "collection<map<string, any>>", null)
      
    3. (Optional) Click the Start Preview icon (Start Preview button), then click the newly added Eval function to confirm that your records now include logEvents as a top-level field.
    4. Expand this section to see an example of what one of your records might look like in the List view.

      {
          owner: "984646522837",
          logGroup: "IFacetoLogGroup",
          source: "984646522837_us-west-2_IFacetoLogGroup",
          host: "eni-0a8d610ca281edcb7-all",
          messageType: "DATA_MESSAGE",
          source_type: "aws:cloudwatchlogs:vpcflow",
          logEvents: [ 
              {  
                  id: "35523561641084722377646761787974676562750764971027464192",
                  message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
                  timestamp: 1592931596000
              },
              { 
                  id: "35523561641084722377646761787974676562750764971027464193",
                  message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK",
                  timestamp: 1592931596000
              },
              {
                  id: "35523561641084722377646761787974676562750764971027464194",
                  message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK",
                  timestamp: 1592931596000
              },
              { 
                  id: "35523561641084722377646761787974676562750764971027464195",
                  message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK",
                  timestamp: 1592931596000
              }
          ],
          value: { 
              owner: "984646522837",
              subscriptionFilters: [ 
                  "Destination"
              ],
              logEvents: [ 
                  {  
                      id: "35523561641084722377646761787974676562750764971027464192",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464193",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.56.101 9887 55232 6 6 582 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  {
                      id: "35523561641084722377646761787974676562750764971027464194",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.56.101 172.31.52.149 55232 9887 6 12 2244 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  },
                  { 
                      id: "35523561641084722377646761787974676562750764971027464195",
                      message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.52.149 172.31.28.163 9997 59720 6 80 3387 1592931596 1592931598 ACCEPT OK",
                      timestamp: 1592931596000
                  }
              ],
              messageType: "DATA_MESSAGE",
              logGroup: "IFacetoLogGroup",
              logStream: "eni-0a8d610ca281edcb7-all"
          },
          key: "c60e2717694a08f5d2e5e6889196506a",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000003",
          sequence: "49607591761037110041819595034591373398647257439043321906",
          approxArrivalTimestamp: 1592931641083,
          accountId: "984646522837",
          region: "us-west-2"
      }
      

In addition to the fields that are part of the Amazon Kinesis Data Stream data schema by default, your records now contain the following top-level fields: owner, logGroup, source, host, messageType, source_type, and logEvents. For details about the Amazon Kinesis Data Stream data schema, see the Function output schema of the Amazon Kinesis Data Stream source function in the Function Reference.

Step 3: (Optional) Drop unnecessary fields from your records

To reduce memory usage during processing and data preview, exclude unnecessary fields from your records. Now that you've extracted the relevant information from the value, region, owner, and logGroup fields, drop them from your records.

Although this is an optional step, it is strongly recommended for more efficient preview and processing of your records.

  1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the last Eval function, then select Fields from the function picker.
  2. On the View Configurations tab, in the Field list field, enter value.
  3. Click + Add and then enter region in the new field that appears.
  4. Repeat the previous step as needed to add two more fields, and enter owner and logGroup in these fields.
  5. In the Operator field, enter a minus sign ( - ).
  6. (Optional) Click the Start Preview icon (Start Preview button), then click the newly added Fields function to confirm that the value, region, owner, and logGroup fields are dropped from your records.

Next, expand the grouped events in the logEvents field into separate records, and finish extracting pertinent data into top-level fields.

Step 4: Expand and process data from the logEvents field

Currently, the logEvents field of each record contains multiple events. Expand these nested events into individual records so that you can process them on an individual basis. Then, extract the pertinent data from the logEvents field into top-level fields that can be mapped to standard fields in the Splunk HEC event schema. Extracting your data this way ensures that the relevant data is indexed properly when you send it to a Splunk index.

Additionally, filter out any VPC flow logs that don't contain meaningful message text. For example, some logs contain "NODATA" as the message.

  1. Expand each record into multiple records based on the list of values in the logEvents field.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the Fields function, then select MV Expand from the function picker.
    2. On the View Configurations tab, in the Field field, enter logEvents. This setting configures the MV Expand function to create a separate record for each value in the logEvents field and carry all other fields into each newly created record.
    3. (Optional) Click the Start Preview icon (Start Preview button), then click the MV Expand function to confirm that the logEvents field no longer contains multiple events.
    4. Expand this section to see an example of what one of your expanded records might look like in the List view.

      {
          source: "984646522837_us-west-2_IFacetoLogGroup",
          host: "eni-0a8d610ca281edcb7-all",
          messageType: "DATA_MESSAGE",
          source_type = "aws:cloudwatchlogs:vpcflow",
          logEvents: { 
              id: "35523561641084722377646761787974676562750764971027464192",
              message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
              timestamp: 1592931596000
      
          },
          key: "c60e2717694a08f5d2e5e6889196506a",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000003",
          sequence: "49607591761037110041819595034591373398647257439043321906",
          approxArrivalTimestamp: 1592931641083,
          accountId: "984646522837"
      }
      
  2. Extract the message text and timestamp from the contents of the logEvents field.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the MV Expand function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following expressions in the Function field of the newly added Eval function:
      body = ucast(map_get(logEvents, "message"), "string", null),
      timestamp = ucast(map_get(logEvents, "timestamp"), "long", null)

      The first expression extracts the value of the message key from the logEvents field, assigns it to the newly created body field, and then casts it to the string data type. This transformation enables the value of message from the VPC flow log to become the value of the body field in the Splunk HEC event.

      The second expression assigns the actual value of the timestamp key to the top-level field named timestamp. This data extraction ensures that the timestamp field contains the time when the event occurred rather than the time when the event was ingested by DSP.
    3. (Optional) Click the Start Preview icon (Start Preview button), then click on the newly added Eval function to confirm that the body and timestamp fields are now included in the records.
  3. To avoid processing messages that don't contain meaningful information, filter out any records where the value of the body field is NODATA.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the last Eval function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following expression in the Predicate field for the newly added Where function:
      match_regex(body, /NODATA/) = false
      
    3. (Optional) Click the Start Preview icon (Start Preview button), then click the newly added Where function to confirm that NODATA messages have been filtered out.
    4. Expand this section to see an example of what one of your finalized records might look like in the List view.

      {
          body: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
          timestamp: 1592931596000,
          source: "984646522837_us-west-2_IFacetoLogGroup",
          host: "eni-0a8d610ca281edcb7-all",
          messageType: "DATA_MESSAGE",
          source_type = "aws:cloudwatchlogs:vpcflow",
          logEvents: { 
              id: "35523561641084722377646761787974676562750764971027464192",
              message: "2 984646522837 eni-0a8d610ca281edcb7 172.31.28.163 172.31.52.149 59720 9997 6 83 111826 1592931596 1592931598 ACCEPT OK",
              timestamp: 1592931596000
      
          },
          key: "c60e2717694a08f5d2e5e6889196506a",
          stream: "VPCtoKinesisStream",
          shard: "shardId-000000000003",
          sequence: "49607591761037110041819595034591373398647257439043321906",
          approxArrivalTimestamp: 1592931641083,
          accountId: "984646522837"
      }
      

You now have properly formatted and and optimized data that is ready to be sent to a Splunk index.

Step 5: Send your transformed data to a Splunk index

Configure the Send to Splunk HTTP Event Collector sink function to send the transformed data from your DSP pipeline to a Splunk index.

  1. Add the Send to Splunk HTTP Event Collector sink function to the end of your pipeline and configure it.
    1. On the pipeline canvas, click the Connect a processing or a sink function icon (Add function or branch icon) after the last Where function, then select Send to Splunk HTTP Event Collector from the function picker.
    2. On the View Configurations tab, set the Connection id field to the ID of your Splunk platform connection.
    3. In the Index field, enter the name of the Splunk index that you want to send your data to. Make sure to enclose the index name in double quotation marks ( " ).
    4. In the Default index field, enter the name of a Splunk index that you want to send your data to if the target index specified during the previous step cannot be used for any reason.
    5. As an alternative to specifying literal index names in the Index and Default index fields, you can specify expressions that dynamically resolve to the index names. For more information, see Send data to Splunk HTTP Event Collector in the Function Reference.

  2. To save your pipeline, click Save.
  3. To start sending your data, click Activate. If you're activating your pipeline for the first time, don't select Skip Restore State or Allow Non-Restored State. See About DSP checkpoints and savepoints in the Use the Data Stream Processor for more details.
    Depending on the size and complexity of the pipeline, it might take some time for the pipeline to finish activating. Before continuing to the next step, wait for the status beside the pipeline name to update to "Activated". Additionally, make sure that all the functions in the pipeline display metrics indicating that data is flowing through the pipeline.
  4. To confirm that DSP is sending your transformed data to your Splunk index, open the Search & Reporting app in Splunk Web and search for your data. Use the following search criteria, where <SpecifiedIndex> is the index name that you specified in the Send to Splunk HTTP Event Collector sink function and <SpecifiedSourcetype> is the value that you assigned to the source_type field in your records:

    index=<SpecifiedIndex> | sourcetype=<SpecifiedSourcetype>

You've successfully transformed your VPC flow logs and sent them to a Splunk index through the .

See also

Functions
Amazon Kinesis Data Stream
Casting
Conversion
Eval
Fields
Map
MV Expand
Send to Splunk HTTP Event Collector
String manipulation
Where
Related topics
Format event data in DSP for Splunk indexes
data types
Overview of the dashboards in the Splunk App for AWS
Last modified on 25 March, 2022
Create a DSP connection to Amazon Kinesis Data Streams   Connecting Amazon S3 to your DSP pipeline as a data destination

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters