Splunk® Data Stream Processor

Connect to Data Sources and Destinations with DSP

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details. For information about upgrading to a supported version, see the Upgrade the Splunk Data Stream Processor topic.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Formatting data into the SignalFx metrics schema

When you create a data pipeline in the to send data to a SignalFx endpoint using the Send Metrics Data to SignalFx sink function, you must make sure that the data is compatible with the SignalFx metrics schema. Search for "The SignalFx Data Model" in the SignalFx Developers Guide for more information.

The following diagram shows how your DSP metrics are transformed to be compatible with the SignalFx metrics schema. A diagram showing how DSP metrics schema is transformed into SignalFx metrics schema.

In this case, the specific dimensions and type values of the record override the defaultDimensions and defaultType values.

Example: Send metrics.log data from the Splunk universal forwarder to SignalFx

The Splunk universal forwarder logs metrics.log data by default. Using DSP, you can process and send metrics.log data to SignalFx to troubleshoot your inputs and review product behavior. See About metrics.log in the Splunk Enterprise Troubleshooting Manual for more details. This example shows you how to do the following tasks:

  • Split the incoming stream of metrics.log data into individual records.
  • Extract the timestamp from the event body and use the extracted value as the timestamp of the event itself.
  • Extract nested maps in your metrics.log data and assign the extracted data to top-level fields.
  • Expand multivalue fields and drop fields from your records to make them compatible with the SignalFx metrics schema.
  • Send your transformed data to SignalFx.

Prerequisites

Best practices

Follow these guidelines to get a better experience building your SignalFx metrics pipeline in DSP:

  • Rename your functions to keep track of their roles because you might use multiple instances of the same function in your pipeline.
  • When previewing the data, switch to the List view for better layout of the JSON format:
  1. In the Preview Results tab, on the navigation bar, click Table.
  2. Select List from the drop-down menu.

Steps

Log data can be converted to metrics in many different ways. The data extraction approaches in steps 7 and 8 are examples particular to metrics.log data from a particular Splunk universal forwarder, not best practices that apply to all scenarios. Adjust your pipeline according to the format of your own records for accurate data transformation.

  1. In DSP, click Build Pipeline.
  2. Select Splunk DSP Firehose as your source function.
  3. Data from multiple log files come into DSP when using Splunk DSP Firehose as the data source, so you need to filter out metrics.log data.
    1. On the pipeline canvas, click the + icon next to the Splunk DSP Firehose function, then select Where from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression in the predicate field:
      source_type="splunkd" AND match_wildcard(source, "*metrics.log")
      This expression keeps only records with source_type that match splunkd and have metrics.log as the source.
    3. Click Start Preview, then click the Where function to confirm that your records have been filtered properly.
  4. The Splunk universal forwarder sends data in 64-kilobyte blocks. You need to split and stitch your events together before you can perform further transformations on them.
    1. On the pipeline canvas, click the + icon to the right of the Where function, then select Apply line break from the function picker.
    2. On the View Configurations tab, click the Break type drop-down menu, then select Advanced.
    3. On the View Configurations tab, enter the following regular expression in the Regex field:
      ([\r\n]+)
      This expression breaks data into an event for each line delimited by return (\r) or newline (\n) characters.
    4. Click Start Preview, then click the Apply line break function to confirm that your data stream has been split properly.
  5. When DSP receives data from the universal forwarder, the timestamp used is the time when the event is ingested, not the actual time at which the event is recorded. You need to reassign the actual time to the timestamp field.
    1. On the pipeline canvas, click the + icon to the right of the Apply line break function, then select Apply Timestamp Extraction from the function picker.
    2. On the View Configurations tab, make sure that Auto is selected.
    3. Click Start Preview, then click the Apply Timestamp Extraction function to make sure the timestamps are extracted and assigned correctly to the timestamp field in your records.
  6. Before you can extract key-value pairs from the body field to work with them, you need to cast body to type string.
    1. On the pipeline canvas, click the + icon to the right of the Apply Timestamp Extraction function, then select Eval from the function picker.
    2. On the View Configurations tab, enter the following SPL2 expression:
      body=cast(body, "string")
    3. On the Output Fields tab, click Update to confirm that the body field is now of type string.
  7. Extract key-value pairs from the body field to create maps for the dimensions field.
    1. On the pipeline canvas, click the + icon to the right of the Eval function, then select Parse with regex from the function picker.
    2. On the View Configurations tab, enter body in the field field.
    3. On the View Configurations tab, enter the following SPL2 expression in the pattern field:
      /(?<keys>\S+)=(?<values>[^,]+),?/
      A list of all the strings appearing before an equal sign ( = ) are extracted and assigned to a newly created keys field. A list of all the strings appearing after an equal sign ( = ) are extracted and assigned to a newly created values field.
    4. On the View Configurations tab, enter a max match limit in the max_match field. See Rex in the Functions Reference manual for more details.
    5. Click Start Preview, then click the Parse with regex function to confirm that the new keys and values fields are created and populated correctly with the keys and values extracted from the body field.
  8. Extract dimensions data points from the body field to create the dimensions map.
    1. On the pipeline canvas, click the + icon to the right of the Parse with regex function, then select Eval from the function picker.
    2. On the View Configurations tab, click + Add four times to create four more blank fields.
    3. On the View Configurations tab, enter each of the following SPL2 expressions in a separate function field in this order:
      SPL2 expression Descriptions
      range=mvrange(0, length(keys))
      Get the list of indices corresponding to elements in list keys to use as iterators for list values.
      dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL)
      Iterate through list values and filter out indices of elements that don't parse as double. These elements are non-numeric values, which mean they can be used as dimension data points of the records in this case.

      The extraction of dimensions data based on non-numeric values might or might not be accurate for all data types, so you filter your dimensions based on the format of your records.

      dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x)))
      Create a list of dimension maps using the iterators in dimensionsRange and assign it to a new dimensions field.
      dimensions=map_merge(dimensions)
      Merge the list of dimension maps into a single map.
      dimensions = ucast(dimensions, "map<string, string>", null)
      Cast the dimensions field to type map<string, string>.
    4. On the Output Fields tab, click Update.
    5. Click Start Preview to confirm that the dimensions field is properly created. Here's an example of what your record might look like in the List view:
      {
          range: [
              0,
              1,
              2,
              3,
              4,
              5,
              6
          ],
          dimensionsRange: [
              0,
              1,
              3,
              4,
              6
          ],
          dimensions: {
              destIp: "52.88.24.27",
              destHost: "forwarders.playground.scp.splunk.com",
              publisher: "tcpout",
              eventType: "connect_try",
              statusee: "TcpOutputProcessor"
          },
          body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.playground.scp.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor",
          nanos: 0,
          kind: "event",
          host: "C02X60K4JGH5",
          source_type: "splunkd",
          attributes: {
              arbitrary_fields: { 
                  _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
                  _channel: "2910"
              },
              spl_forwarder_channel_info: { 
                  forwarder_src_ip: "C02X60K4JGH5",
                  forwarder_src_port: 9991,
                  forwarder_channel_id: 3
              },
              _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb",
              _splunk_connection_id: "forwarders:all",
              spl_fwd_type: "uf",
              spl_flags: 565,
              index: "_internal",
              spl_stmid: {
                  offset: 0,
                  suboffset: 0,
                  strmid: 0
              }
          },
          _rule: {
              _rule_id: "GROK_TIMESTAMP",
              _rule_description: "Grok based timestamp patterns",
              _metadata: {
                  _pattern: "%{DATESTAMP}"
              }
          },
          id: "",
          source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
          timestamp: 1597422342046,
          keys: [
              "destHost",
              "destIp",
              "destPort",
              "eventType",
              "publisher",
              "sourcePort",
              "statusee"
          ],
          values: [
              "forwarders.playground.scp.splunk.com",
              "52.88.24.27",
              "9997",
              "connect_try",
              "tcpout",
              "9991",
              "TcpOutputProcessor"
          ]
      }
      
  9. Now that the dimensions data are extracted from the body field and assigned to a new field, you can transform the body field into a list of metrics name and value maps.
    1. On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Eval from the function picker.
    2. On the View Configurations tab, click + Add to create another blank field.
    3. On the View Configurations tab, enter the following SPL2 expression in the function fields in this order:
      SPL2 expression Description
      metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL)
      Iterate through list values and filter out indices of elements that parse as double. These elements are numeric values, which mean they can be used as metrics data points of the records in this case.

      The extraction of metrics data based on numeric values might or might not be accurate for all data types, so you filter your metrics based on the format of your records.

      body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)})
      Create a list of all metrics name and value maps using the iterators in metricsRange and assign it to the body field.
    4. On the Output Fields tab, click Update.
    5. Click Start Preview to confirm that the body field is properly transformed. Here's an example of what your record might look like in the List view:
      {
          metricsRange: [ 
              2,
              5
          ],
          body: [
              {
                  value: "9997",
                  name: "destPort"
              },
              {
                  value: "9991",
                  name: "sourcePort"
              }
          ],
          range: [
              0,
              1,
              2,
              3,
              4,
              5,
              6
          ],
          dimensionsRange: [
              0,
              1,
              3,
              4,
              6
          ],
          dimensions: {
              destIp: "52.88.24.27",
              destHost: "forwarders.playground.scp.splunk.com",
              publisher: "tcpout",
              eventType: "connect_try",
              statusee: "TcpOutputProcessor"
          },
          nanos: 0,
          kind: "event",
          host: "C02X60K4JGH5",
          source_type: "splunkd",
          attributes: {
              arbitrary_fields: {
                  _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
                  _channel: "2910"
              },
              spl_forwarder_channel_info: { 
                  forwarder_src_ip: "C02X60K4JGH5",
                  forwarder_src_port: 9991,
                  forwarder_channel_id: 3
              },
              _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb",
              _splunk_connection_id: "forwarders:all",
              spl_fwd_type: "uf",
              spl_flags: 565,
              index: "_internal",
              spl_stmid: {
                  offset: 0,
                  suboffset: 0,
                  strmid: 0
              }
          },
          _rule: {
              _rule_id: "GROK_TIMESTAMP",
              _rule_description: "Grok based timestamp patterns",
              _metadata: {
                  _pattern: "%{DATESTAMP}"
              }
          },
          id: "",
          source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log",
          timestamp: 1597422342046,
          keys: [
              "destHost",
              "destIp",
              "destPort",
              "eventType",
              "publisher",
              "sourcePort",
              "statusee"
          ],
          values: [
              "forwarders.playground.scp.splunk.com",
              "52.88.24.27",
              "9997",
              "connect_try",
              "tcpout",
              "9991",
              "TcpOutputProcessor"
          ]
      }
      
  10. Keep only fields compatible with the SignalFx metrics schema and drop all other fields from your records.
    1. On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Fields from the function picker.
    2. On the View Configurations tab, enter body in the field_list field.
    3. On the View Configurations tab, click + Add twice to add two new fields, then enter dimensions and timestamp in each of the newly created fields.
    4. On the View Configurations tab, enter + in the operator (optional) field. Adding the plus ( + ) operator indicates that the listed fields are the only fields to be kept in the records.
    5. On the Output Fields tab, click Update to confirm that the unwanted fields are dropped from your records.
  11. Flatten each record containing multiple nested records into individual records.
    1. On the pipeline canvas, click the + icon to the right of the Fields function, then select MV Expand from the function picker.
    2. On the View Configurations tab, enter body in the field field and 0 in the limit field. The mvexpand function flattens all the values within the named field and carries all other fields into each newly created record.
    3. Click Start Preview, then click the MV Expand function to confirm that the nested records are expanded properly. The body field of each record now contains only one map of metrics name and value. Here's an example of what your record might look like the List view:
      {
          body: {
              value: "9991",
              name: "sourcePort"
          },
          dimensions: {
              destIp: "52.88.24.27",
              destHost: "forwarders.playground.scp.splunk.com",
              publisher: "tcpout",
              eventType: "connect_try",
              statusee: "TcpOutputProcessor"
          },
          timestamp: 1597422342046
      }
  12. Send your transformed data to SignalFx.
    1. On the pipeline canvas, click the + icon to the right of the MV Expand function, then select Send Metrics Data to SignalFx from the function picker.
    2. On the View Configurations tab, configure the function with your SignalFx connection.
    3. On the View Configurations tab, enter the following SPL2 expressions and input in the corresponding fields:
      Field SPL2 expression/input
      metric_name
      map_get(body, "name")
      metric_value
      cast(map_get(body, "value"), "double")
      metric_type "GAUGE"
      metric_timestamp (optional) timestamp
      metric_dimensions (optional) dimensions
  13. To see if your data is successfully sent to SignalFx, see Planning and Creating Charts in the SignalFx Infrastructure Monitoring manual for details on how to view your data.

You've successfully transformed your metrics.log data and sent them to SignalFx through DSP using the Canvas Builder. Alternatively, if you want to build your pipeline using the SPL2 Builder, here's the full SPL2 expressions for the pipeline. Replace the placeholder with your own SignalFx connection ID in the sink function:

| from splunk_firehose() | where source_type="splunkd" AND match_wildcard(source, "*metrics.log") | apply_line_breaking line_breaker="([\\r\\n]+)" truncate=10000 linebreak_type="advanced" | apply_timestamp_extraction fallback_to_auto=false extraction_type="auto" | eval body=cast(body, "string") | rex max_match=20 field=body /(?<keys>\S+)=(?<values>[^,]+),?/ | eval range=mvrange(0, length(keys)), dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL), dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x))), dimensions=map_merge(dimensions), dimensions=ucast(dimensions, "map<string, string>", null) | eval metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL), body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)}) | fields + body, dimensions, timestamp | mvexpand limit=0 body | into into_signalfx("YOUR_SIGNALFX_CONNECTION_ID", map_get(body, "name"), cast(map_get(body, "value"), "double"), "GAUGE", timestamp, dimensions);

See also

Functions
Evals
Fields
Map
Mvexpand
List
Related topics
Process data from a universal forwarder in DSP
Working with nested data
Last modified on 21 June, 2021
Create a DSP connection to SignalFx   HTTP Event Collector and the

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters