Splunk® Data Stream Processor

Connect to Data Sources and Destinations with DSP

DSP 1.2.1 is impacted by the CVE-2021-44228 and CVE-2021-45046 security vulnerabilities from Apache Log4j. To fix these vulnerabilities, you must upgrade to DSP 1.2.4. See Upgrade the Splunk Data Stream Processor to 1.2.4 for upgrade instructions.

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Formatting data into the Splunk Infrastructure Monitoring metrics schema

Before sending your data from a DSP pipeline to Splunk Infrastructure Monitoring, make sure to format your records so that all the relevant data is stored in the fields that will be mapped to the Splunk Infrastructure Monitoring metrics schema.

Splunk Infrastructure Monitoring supports the following schema for metric datapoints:

{
  "<metric type>":
    [ {
      "metric": string,
      "value": number,
      "timestamp": int64,
      "dimensions": object
       } ]
}

See the /datapoint section of Send Metrics and Events in the Observability API Reference for more information.

When configuring the Send to Splunk Infrastructure Monitoring sink function, you specify which fields from your DSP records to map to the metric, value, timestamp, and dimensions fields in the Splunk Infrastructure Monitoring metric datapoints. All other fields from your DSP records are dropped.

See Send data to Splunk Infrastructure Monitoring in the Function Reference for more information about configuring the sink function.

Example: Send metrics.log data from the Splunk universal forwarder to Splunk Infrastructure Monitoring

The Splunk universal forwarder logs metrics.log data by default. Using the , you can process and send metrics.log data to Splunk Infrastructure Monitoring to troubleshoot your inputs and review product behavior. See About metrics.log in the Splunk Enterprise Troubleshooting Manual for more details.

This example shows you how to do the following tasks:

  • Process the incoming stream of metrics.log data, which is unparsed. You'll need to split the data stream into discrete events with timestamps that indicate when the event was originally generated.
  • Extract dimensions from nested fields and assign them to top-level fields.
  • Format the records so that each one contains only the relevant fields for a single metric.
  • Send your transformed data to Splunk Infrastructure Monitoring, making sure to map the relevant DSP record fields to the fields that are retained in Splunk Infrastructure Monitoring.

Log data can be converted to metrics in many different ways. The data extraction and conversion approaches described on this page are examples that apply to the metrics.log data from a particular Splunk universal forwarder, not best practices that apply to all scenarios. Adjust your pipeline according to the format of your own records for accurate data transformation.

Prerequisites

Before you can use the to transform data from a forwarder and send it to Splunk Infrastructure Monitoring, you must have the following:

Best practices

Follow these guidelines to get a better experience building your Splunk Infrastructure Monitoring metrics pipeline in the :

  • Rename your functions to keep track of their roles because you might use multiple instances of the same function in your pipeline.
  • When previewing the data, switch to the List view to see a clearer layout of the JSON format:
  1. In the Preview Results tab, on the navigation bar, click Table.
  2. Select List from the drop-down menu.

Step 1: Receive and parse metrics.log data from the universal forwarder

Splunk universal forwarders send data to the in 64-kilobyte blocks of unparsed data, and might be configured to send other types of data in addition to metrics.log data. To make sure that your pipeline only includes metrics.log data, and that each record contains complete information for each metric event, start by configuring your pipeline to do the following:

  • Filter out any data that does not pertain to metrics.log.
  • Split and merge events so that each event is contained in a single record.
  • Change the timestamp of the record so that it reflects the time when the event was originally generated instead of the time when the event was ingested into the .

Steps

  1. In DSP, click Build Pipeline.
  2. Select Forwarders Service as your source function.
  3. Filter the incoming data so that only metrics.log data is included.
    1. On the Canvas View, click the + icon next to the Forwarders Service function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following expression in the predicate field:
      source_type="splunkd" AND match_wildcard(source, "*metrics.log")
      This expression keeps records where the value of the source_type field is splunkd, and the value of the source field is metrics.log.
    3. (Optional) Click the Start Preview button, then click the Where function to confirm that your records have been filtered properly.
  4. Split the 64-kilobyte blocks of data from the universal forwarder into discrete events.
    1. On the Canvas View, click the + icon to the right of the Where function, then select Apply Line Break from the function picker.
    2. On the View Configurations tab, set the Break type to Advanced.
    3. In the Regex field, enter the following regular expression:
      ([\r\n]+)
      This expression breaks data into an event for each line delimited by return (\r) or newline (\n) characters.
    4. (Optional) Click the Start Preview button, then click the Apply Line Break function to confirm that your data stream is being split properly.
  5. Extract the timestamp value nested in the body field and use it as the value of the the top-level timestamp field.
    1. On the pipeline canvas, click the + icon to the right of the Apply Line Break function, then select Apply Timestamp Extraction from the function picker.
    2. On the View Configurations tab, make sure that Auto is selected.
    3. (Optional) Click the Start Preview button, then click the Apply Timestamp Extraction function to make sure the timestamps are extracted and assigned correctly to the timestamp field in your records.

Expand this section to see an example of what one of your filtered and processed records might look like in the List view.

Notice how dimensions such as destHost and destIp, metric names such as destPort and sourcePort, and metric values such as 9997 and 9991 are all stored together as one entity in the body field.

{
    body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.scs.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor",
    nanos: 0,
    kind: "event",
    host: "C02X60K4JGH5",
    source_type: "splunkd",
    attributes: {
        arbitrary_fields: { 
            _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
            _channel: "2910"
        },
        spl_forwarder_channel_info: { 
            forwarder_src_ip: "C02X60K4JGH5",
            forwarder_src_port: 9991,
            forwarder_channel_id: 3
        },
        _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb",
        _splunk_connection_id: "forwarders:all",
        spl_fwd_type: "uf",
        spl_flags: 565,
        index: "_internal",
        spl_stmid: {
            offset: 0,
            suboffset: 0,
            strmid: 0
        }
    },
    _rule: {
        _rule_id: "GROK_TIMESTAMP",
        _rule_description: "Grok based timestamp patterns",
        _metadata: {
            _pattern: "%{DATESTAMP}"
        }
    },
    id: "",
    source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
    timestamp: 1597422342046
}

The next few sections demonstrate how you can extract dimensions to a different field, and format metric names and values to be contained as separate entities. Transforming your records this way ensures that your data can be consistently mapped to the appropriate parts of the Splunk Infrastructure Monitoring metrics schema.

Step 2: Extract dimensions into top-level fields

Extract all the dimensions from the body field to a top-level field of their own, so that you can easily map that field to the dimensions field in the Splunk Infrastructure Monitoring metrics schema. This extraction process involves configuring your pipeline to do the following:

  • Cast the body field from the union data type to the string data type so that the field can be used as input in streaming functions. See data types in the User Manual for more information about the union data type.
  • Extract the keys and values from the key-value pairs in the body field into separate fields.
  • Identify all the extracted values that are dimension data points rather than metric values.
  • Create a top-level field named dimensions containing the key-value pairs for the dimension data points.

Steps

  1. Cast the body field to type string.
    1. On the pipeline canvas, click the + icon to the right of the Apply Timestamp Extraction function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following expression:
      body=cast(body, "string")
    3. In the Output Fields panel, click Update and review the list of fields to confirm that the body field is now of type string.
  2. Extract the key-value pairs from the body field into separate fields named keys and values.
    1. On the pipeline canvas, click the + icon to the right of the Eval function, then select Parse with Regex from the function picker.
    2. On the View Configurations tab, enter body in the field field.
    3. In the pattern field, enter the following expression:
      /(?<keys>\S+)=(?<values>[^,]+),?/
      This expression extracts all the strings that appear before an equal sign ( = ) and assigns them to a newly created field named keys. This expression also extracts all the strings that appear after an equal sign ( = ) and assigns them to a newly created field named values.
    4. In the max_match field, enter 20.
    5. (Optional) Click the Start Preview button, then click the Parse with Regex function to confirm that the new keys and values fields are created and populated correctly with the keys and values extracted from the body field.
  3. Identify dimension data points and create a dimensions field containing the key-value pairs for those data points.
    1. On the pipeline canvas, click the + icon to the right of the Parse with Regex function, then select Eval from the function picker.
    2. On the View Configurations tab, click + Add four times to create four more blank fields.
    3. Enter each of the following expressions in a separate function field in this order:
      Expression Description
      range=mvrange(0, length(keys))
      Get a list of indices corresponding to elements in the keys list to use as iterators for the values list.
      dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL)
      Iterate through the values list and filter for the indices of elements that don't parse as double. These elements are non-numeric values, which means they can be used as the dimension data points of the records in this case.

      The extraction of dimensions data based on non-numeric values might not be accurate for all data types. Make sure to filter your dimensions based on the format of your records.

      dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x)))
      Create a list of dimension maps using the iterators in dimensionsRange, and assign this list of maps to a new dimensions field.
      dimensions=map_merge(dimensions)
      Merge the list of dimension maps into a single map.
      dimensions = ucast(dimensions, "map<string, string>", null)
      Cast the dimensions field to type map<string, string>.
    4. (Optional) Click the Start Preview button to confirm that the dimensions field is properly created.

Expand this section to see an example of what your record might look like in the List view after the dimensions field is created.

{
    range: [
        0,
        1,
        2,
        3,
        4,
        5,
        6
    ],
    dimensionsRange: [
        0,
        1,
        3,
        4,
        6
    ],
    dimensions: {
        destIp: "52.88.24.27",
        destHost: "forwarders.scs.splunk.com",
        publisher: "tcpout",
        eventType: "connect_try",
        statusee: "TcpOutputProcessor"
    },
    body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.scs.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor",
    nanos: 0,
    kind: "event",
    host: "C02X60K4JGH5",
    source_type: "splunkd",
    attributes: {
        arbitrary_fields: { 
            _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
            _channel: "2910"
        },
        spl_forwarder_channel_info: { 
            forwarder_src_ip: "C02X60K4JGH5",
            forwarder_src_port: 9991,
            forwarder_channel_id: 3
        },
        _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb",
        _splunk_connection_id: "forwarders:all",
        spl_fwd_type: "uf",
        spl_flags: 565,
        index: "_internal",
        spl_stmid: {
            offset: 0,
            suboffset: 0,
            strmid: 0
        }
    },
    _rule: {
        _rule_id: "GROK_TIMESTAMP",
        _rule_description: "Grok based timestamp patterns",
        _metadata: {
            _pattern: "%{DATESTAMP}"
        }
    },
    id: "",
    source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
    timestamp: 1597422342046,
    keys: [
        "destHost",
        "destIp",
        "destPort",
        "eventType",
        "publisher",
        "sourcePort",
        "statusee"
    ],
    values: [
        "forwarders.scs.splunk.com",
        "52.88.24.27",
        "9997",
        "connect_try",
        "tcpout",
        "9991",
        "TcpOutputProcessor"
    ]
}

Step 3: Format the records to contain only the relevant fields for a single metric

Now that the dimension data points are extracted from the body field and assigned to a new field, you can transform the body field so that it only contains a list of maps for metric names and values. Transforming the body field this way gets rid of redundant data and lets you easily map the names and values of each metric to the corresponding fields in the Splunk Infrastructure Monitoring metrics schema. To further trim down on redundant data, drop any unnecessary fields from your records.

The resulting body field contains the name and value for more than one metric. You must split the records so that each one contains the key-value pairs for the name and value of a single metric only.

  1. Transform the body field into a list of metrics name and value maps.
    1. On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, click + Add to create another blank field.
    3. Enter the following expressions in the function fields in this order:
      Expression Description
      metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL)
      Iterate through list values and filter out indices of elements that parse as double. These elements are numeric values, which mean they can be used as metrics data points of the records in this case.

      The extraction of metrics data based on numeric values might or might not be accurate for all data types, so you filter your metrics based on the format of your records.

      body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)})
      Create a list of all metrics name and value maps using the iterators in metricsRange and assign it to the body field.
    4. (Optional) Click the Start Preview button to confirm that the body field is properly transformed.
  2. Expand this section to see an example of what one of your records might look like in the List view after the body field is transformed.

    {
        metricsRange: [ 
            2,
            5
        ],
        body: [
            {
                value: "9997",
                name: "destPort"
            },
            {
                value: "9991",
                name: "sourcePort"
            }
        ],
        range: [
            0,
            1,
            2,
            3,
            4,
            5,
            6
        ],
        dimensionsRange: [
            0,
            1,
            3,
            4,
            6
        ],
        dimensions: {
            destIp: "52.88.24.27",
            destHost: "forwarders.scs.splunk.com",
            publisher: "tcpout",
            eventType: "connect_try",
            statusee: "TcpOutputProcessor"
        },
        nanos: 0,
        kind: "event",
        host: "C02X60K4JGH5",
        source_type: "splunkd",
        attributes: {
            arbitrary_fields: {
                _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
                _channel: "2910"
            },
            spl_forwarder_channel_info: { 
                forwarder_src_ip: "C02X60K4JGH5",
                forwarder_src_port: 9991,
                forwarder_channel_id: 3
            },
            _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb",
            _splunk_connection_id: "forwarders:all",
            spl_fwd_type: "uf",
            spl_flags: 565,
            index: "_internal",
            spl_stmid: {
                offset: 0,
                suboffset: 0,
                strmid: 0
            }
        },
        _rule: {
            _rule_id: "GROK_TIMESTAMP",
            _rule_description: "Grok based timestamp patterns",
            _metadata: {
                _pattern: "%{DATESTAMP}"
            }
        },
        id: "",
        source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
        timestamp: 1597422342046,
        keys: [
            "destHost",
            "destIp",
            "destPort",
            "eventType",
            "publisher",
            "sourcePort",
            "statusee"
        ],
        values: [
            "forwarders.scs.splunk.com",
            "52.88.24.27",
            "9997",
            "connect_try",
            "tcpout",
            "9991",
            "TcpOutputProcessor"
        ]
    }
    
  3. Keep the fields that you'll map to the name, value, dimensions, and timestamp fields in the Splunk Infrastructure Monitoring metrics schema, and drop all other fields from the record.
    1. On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Fields from the function picker.
    2. On the View Configurations tab, enter body in the field_list field.
    3. Click + Add twice to add two new fields, then enter dimensions and timestamp in each of the newly created fields.
    4. Enter + in the operator field. The plus ( + ) operator configures the function to keep the fields that you specified in the Field list field and drop the rest.
    5. In the Output Fields panel, click Update and review the list of fields to confirm that the unwanted fields are dropped from your records.
  4. Flatten each record that contains multiple nested records into individual records.
    1. On the pipeline canvas, click the + icon to the right of the Fields function, then select MV Expand from the function picker.
    2. On the View Configurations tab, enter body in the field field, and then enter 0 in the limit field. The MV Expand function flattens all the values within the named field and carries all other fields into each newly created record.
    3. (Optional) Click the Start Preview button, then click the MV Expand function to confirm that the nested records are expanded properly. The body field of each record now contains only one map of metrics name and value.
  5. Expand this section to see an example of what one of your records might look like in the List view after the record is flattened and unnecessary fields have been dropped.

    {
        body: {
            value: "9991",
            name: "sourcePort"
        },
        dimensions: {
            destIp: "52.88.24.27",
            destHost: "forwarders.scs.splunk.com",
            publisher: "tcpout",
            eventType: "connect_try",
            statusee: "TcpOutputProcessor"
        },
        timestamp: 1597422342046
    }

Step 4: Send your transformed data to Splunk Infrastructure Monitoring

Configure the Send to Splunk Infrastructure Monitoring sink function to map the relevant fields from your DSP records to the name, value, dimension, and timestamp fields that are supported in the Splunk Infrastructure Monitoring metrics schema.

Steps

  1. Add the Send to Splunk Infrastructure Monitoring sink function to the end of your pipeline and configure it.
    1. On the pipeline canvas, click the + icon to the right of the MV Expand function, then select Send to Splunk Infrastructure Monitoring from the function picker.
    2. On the View Configurations tab, set the connection_id field to the ID of your Splunk Observability connection.
    3. Enter the following expressions and input in the corresponding fields:
      Field Supported data type Expression or input
      metric_name String
      map_get(body, "name")
      metric_value Double
      cast(map_get(body, "value"), "double")
      metric_type String "GAUGE"
      metric_timestamp (optional) Long timestamp
      metric_dimensions (optional) map<string, string> dimensions
  2. To save your pipeline, click Save.
  3. To start sending your data, click Activate Pipeline and then click Activate. If you're activating your pipeline for the first time, don't select Skip Restore State or Allow Non-Restored State. See Using activation checkpoints to activate your pipeline in the Use the manual for more details.
    Depending on the size and complexity of the pipeline, it might take some time for the pipeline to finish activating. Before continuing to the next step, wait for the status beside the pipeline name to update to "Activated". Additionally, make sure that all the functions in the pipeline display metrics indicating that data is flowing through the pipeline.
  4. To see if your data is successfully sent to Splunk Infrastructure Monitoring, see Planning and Creating Charts in the Splunk Infrastructure Monitoring documentation for details on how to view your data.

You've successfully transformed your metrics.log data and sent it to Splunk Infrastructure Monitoring through the .

As an alternative to building your pipeline using the Canvas View, you can use the SPL View instead. The following is the full SPL2 expression for the pipeline described on this page. Replace the "YOUR_OBSERVABILITY_CONNECTION_ID" placeholder value with the connection ID of your Splunk Observability connection.

| from forwarders("forwarders:all") | where source_type="splunkd" AND match_wildcard(source, "*metrics.log") | apply_line_breaking line_breaker="([\\r\\n]+)" truncate=10000 linebreak_type="advanced" | apply_timestamp_extraction fallback_to_auto=false extraction_type="auto" | eval body=cast(body, "string") | rex max_match=20 field=body /(?<keys>\S+)=(?<values>[^,]+),?/ | eval range=mvrange(0, length(keys)), dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL), dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x))), dimensions=map_merge(dimensions), dimensions=ucast(dimensions, "map<string, string>", null) | eval metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL), body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)}) | fields + body, dimensions, timestamp | mvexpand limit=0 body | into into_signalfx("YOUR_OBSERVABILITY_CONNECTION_ID", map_get(body, "name"), cast(map_get(body, "value"), "double"), "GAUGE", timestamp, dimensions);

See also

Functions
Apply Line Break
Apply Timestamp Extraction
Eval
Fields
Forwarders Service
List
Map
MV Expand
Parse with Regex
Send to Splunk Infrastructure Monitoring
Related topics
Process data from a universal forwarder in DSP
Working with nested data
Last modified on 22 March, 2022
Create a DSP connection to Splunk Observability   HTTP Event Collector and the

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters