Formatting data into the Splunk Infrastructure Monitoring metrics schema
Before sending your data from a DSP pipeline to Splunk Infrastructure Monitoring, make sure to format your records so that all the relevant data is stored in the fields that will be mapped to the Splunk Infrastructure Monitoring metrics schema.
Splunk Infrastructure Monitoring supports the following schema for metric datapoints:
{ "<metric type>": [ { "metric": string, "value": number, "timestamp": int64, "dimensions": object } ] }
See the /datapoint
section of Send Metrics and Events in the Observability API Reference for more information.
When configuring the Send to Splunk Infrastructure Monitoring sink function, you specify which fields from your DSP records to map to the metric
, value
, timestamp
, and dimensions
fields in the Splunk Infrastructure Monitoring metric datapoints. All other fields from your DSP records are dropped.
See Send data to Splunk Infrastructure Monitoring in the Function Reference for more information about configuring the sink function.
Example: Send metrics.log data from the Splunk universal forwarder to Splunk Infrastructure Monitoring
The Splunk universal forwarder logs metrics.log
data by default. Using the , you can process and send metrics.log
data to Splunk Infrastructure Monitoring to troubleshoot your inputs and review product behavior. See About metrics.log in the Splunk Enterprise Troubleshooting Manual for more details.
This example shows you how to do the following tasks:
- Process the incoming stream of
metrics.log
data, which is unparsed. You'll need to split the data stream into discrete events with timestamps that indicate when the event was originally generated. - Extract dimensions from nested fields and assign them to top-level fields.
- Format the records so that each one contains only the relevant fields for a single metric.
- Send your transformed data to Splunk Infrastructure Monitoring, making sure to map the relevant DSP record fields to the fields that are retained in Splunk Infrastructure Monitoring.
Log data can be converted to metrics in many different ways. The data extraction and conversion approaches described on this page are examples that apply to the metrics.log
data from a particular Splunk universal forwarder, not best practices that apply to all scenarios. Adjust your pipeline according to the format of your own records for accurate data transformation.
Prerequisites
Before you can use the to transform data from a forwarder and send it to Splunk Infrastructure Monitoring, you must have the following:
- A Splunk universal forwarder that has been configured to send data to . See Connect a Splunk forwarder to the .
- A Splunk Observability connection. See Create a DSP connection to Splunk Observability.
Best practices
Follow these guidelines to get a better experience building your Splunk Infrastructure Monitoring metrics pipeline in the :
- Rename your functions to keep track of their roles because you might use multiple instances of the same function in your pipeline.
- When previewing the data, switch to the List view to see a clearer layout of the JSON format:
- In the Preview Results tab, on the navigation bar, click Table.
- Select List from the drop-down menu.
Step 1: Receive and parse metrics.log data from the universal forwarder
Splunk universal forwarders send data to the in 64-kilobyte blocks of unparsed data, and might be configured to send other types of data in addition to metrics.log
data. To make sure that your pipeline only includes metrics.log
data, and that each record contains complete information for each metric event, start by configuring your pipeline to do the following:
- Filter out any data that does not pertain to
metrics.log
. - Split and merge events so that each event is contained in a single record.
- Change the timestamp of the record so that it reflects the time when the event was originally generated instead of the time when the event was ingested into the .
Steps
- In DSP, click Build Pipeline.
- Select Forwarders Service as your source function.
- Filter the incoming data so that only
metrics.log
data is included.- On the Canvas View, click the + icon next to the Forwarders Service function, then select Where from the function picker.
- On the View Configurations tab, enter the following expression in the predicate field:
This expression keeps records where the value of the
source_type="splunkd" AND match_wildcard(source, "*metrics.log")
source_type
field issplunkd
, and the value of thesource
field ismetrics.log
. - (Optional) Click the Start Preview button, then click the Where function to confirm that your records have been filtered properly.
- Split the 64-kilobyte blocks of data from the universal forwarder into discrete events.
- On the Canvas View, click the + icon to the right of the Where function, then select Apply Line Break from the function picker.
- On the View Configurations tab, set the Break type to Advanced.
- In the Regex field, enter the following regular expression:
This expression breaks data into an event for each line delimited by return (\r) or newline (\n) characters.
([\r\n]+)
- (Optional) Click the Start Preview button, then click the Apply Line Break function to confirm that your data stream is being split properly.
- Extract the timestamp value nested in the
body
field and use it as the value of the the top-leveltimestamp
field.- On the pipeline canvas, click the + icon to the right of the Apply Line Break function, then select Apply Timestamp Extraction from the function picker.
- On the View Configurations tab, make sure that Auto is selected.
- (Optional) Click the Start Preview button, then click the Apply Timestamp Extraction function to make sure the timestamps are extracted and assigned correctly to the
timestamp
field in your records.
Expand this section to see an example of what one of your filtered and processed records might look like in the List view.
Notice how dimensions such as destHost
and destIp
, metric names such as destPort
and sourcePort
, and metric values such as 9997 and 9991 are all stored together as one entity in the body
field.
{ body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.scs.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor", nanos: 0, kind: "event", host: "C02X60K4JGH5", source_type: "splunkd", attributes: { arbitrary_fields: { _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", _channel: "2910" }, spl_forwarder_channel_info: { forwarder_src_ip: "C02X60K4JGH5", forwarder_src_port: 9991, forwarder_channel_id: 3 }, _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb", _splunk_connection_id: "forwarders:all", spl_fwd_type: "uf", spl_flags: 565, index: "_internal", spl_stmid: { offset: 0, suboffset: 0, strmid: 0 } }, _rule: { _rule_id: "GROK_TIMESTAMP", _rule_description: "Grok based timestamp patterns", _metadata: { _pattern: "%{DATESTAMP}" } }, id: "", source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", timestamp: 1597422342046 }
The next few sections demonstrate how you can extract dimensions to a different field, and format metric names and values to be contained as separate entities. Transforming your records this way ensures that your data can be consistently mapped to the appropriate parts of the Splunk Infrastructure Monitoring metrics schema.
Step 2: Extract dimensions into top-level fields
Extract all the dimensions from the body
field to a top-level field of their own, so that you can easily map that field to the dimensions
field in the Splunk Infrastructure Monitoring metrics schema. This extraction process involves configuring your pipeline to do the following:
- Cast the
body
field from the union data type to the string data type so that the field can be used as input in streaming functions. See data types in the User Manual for more information about the union data type. - Extract the keys and values from the key-value pairs in the
body
field into separate fields. - Identify all the extracted values that are dimension data points rather than metric values.
- Create a top-level field named
dimensions
containing the key-value pairs for the dimension data points.
Steps
- Cast the
body
field to type string.- On the pipeline canvas, click the + icon to the right of the Apply Timestamp Extraction function, then select Eval from the function picker.
- On the View Configurations tab, enter the following expression:
body=cast(body, "string")
- In the Output Fields panel, click Update and review the list of fields to confirm that the
body
field is now of type string.
- Extract the key-value pairs from the
body
field into separate fields namedkeys
andvalues
.- On the pipeline canvas, click the + icon to the right of the Eval function, then select Parse with Regex from the function picker.
- On the View Configurations tab, enter body in the field field.
- In the pattern field, enter the following expression:
This expression extracts all the strings that appear before an equal sign ( = ) and assigns them to a newly created field named
/(?<keys>\S+)=(?<values>[^,]+),?/
keys
. This expression also extracts all the strings that appear after an equal sign ( = ) and assigns them to a newly created field namedvalues
. - In the max_match field, enter 20.
- (Optional) Click the Start Preview button, then click the Parse with Regex function to confirm that the new
keys
andvalues
fields are created and populated correctly with the keys and values extracted from thebody
field.
- Identify dimension data points and create a
dimensions
field containing the key-value pairs for those data points.- On the pipeline canvas, click the + icon to the right of the Parse with Regex function, then select Eval from the function picker.
- On the View Configurations tab, click + Add four times to create four more blank fields.
- Enter each of the following expressions in a separate function field in this order:
Expression Description range=mvrange(0, length(keys))
Get a list of indices corresponding to elements in the keys
list to use as iterators for thevalues
list.dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL)
Iterate through the values
list and filter for the indices of elements that don't parse as double. These elements are non-numeric values, which means they can be used as the dimension data points of the records in this case.The extraction of dimensions data based on non-numeric values might not be accurate for all data types. Make sure to filter your dimensions based on the format of your records.
dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x)))
Create a list of dimension maps using the iterators in dimensionsRange
, and assign this list of maps to a newdimensions
field.dimensions=map_merge(dimensions)
Merge the list of dimension maps into a single map. dimensions = ucast(dimensions, "map<string, string>", null)
Cast the dimensions
field to type map<string, string>. - (Optional) Click the Start Preview button to confirm that the
dimensions
field is properly created.
Expand this section to see an example of what your record might look like in the List view after the dimensions
field is created.
{ range: [ 0, 1, 2, 3, 4, 5, 6 ], dimensionsRange: [ 0, 1, 3, 4, 6 ], dimensions: { destIp: "52.88.24.27", destHost: "forwarders.scs.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.scs.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor", nanos: 0, kind: "event", host: "C02X60K4JGH5", source_type: "splunkd", attributes: { arbitrary_fields: { _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", _channel: "2910" }, spl_forwarder_channel_info: { forwarder_src_ip: "C02X60K4JGH5", forwarder_src_port: 9991, forwarder_channel_id: 3 }, _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb", _splunk_connection_id: "forwarders:all", spl_fwd_type: "uf", spl_flags: 565, index: "_internal", spl_stmid: { offset: 0, suboffset: 0, strmid: 0 } }, _rule: { _rule_id: "GROK_TIMESTAMP", _rule_description: "Grok based timestamp patterns", _metadata: { _pattern: "%{DATESTAMP}" } }, id: "", source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", timestamp: 1597422342046, keys: [ "destHost", "destIp", "destPort", "eventType", "publisher", "sourcePort", "statusee" ], values: [ "forwarders.scs.splunk.com", "52.88.24.27", "9997", "connect_try", "tcpout", "9991", "TcpOutputProcessor" ] }
Step 3: Format the records to contain only the relevant fields for a single metric
Now that the dimension data points are extracted from the body
field and assigned to a new field, you can transform the body
field so that it only contains a list of maps for metric names and values. Transforming the body
field this way gets rid of redundant data and lets you easily map the names and values of each metric to the corresponding fields in the Splunk Infrastructure Monitoring metrics schema. To further trim down on redundant data, drop any unnecessary fields from your records.
The resulting body
field contains the name and value for more than one metric. You must split the records so that each one contains the key-value pairs for the name and value of a single metric only.
- Transform the
body
field into a list of metrics name and value maps.- On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Eval from the function picker.
- On the View Configurations tab, click + Add to create another blank field.
- Enter the following expressions in the function fields in this order:
Expression Description metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL)
Iterate through list values
and filter out indices of elements that parse as double. These elements are numeric values, which mean they can be used as metrics data points of the records in this case.The extraction of metrics data based on numeric values might or might not be accurate for all data types, so you filter your metrics based on the format of your records.
body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)})
Create a list of all metrics name and value maps using the iterators in metricsRange
and assign it to thebody
field. - (Optional) Click the Start Preview button to confirm that the
body
field is properly transformed.
- Keep the fields that you'll map to the
name
,value
,dimensions
, andtimestamp
fields in the Splunk Infrastructure Monitoring metrics schema, and drop all other fields from the record.- On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Fields from the function picker.
- On the View Configurations tab, enter body in the field_list field.
- Click + Add twice to add two new fields, then enter dimensions and timestamp in each of the newly created fields.
- Enter + in the operator field. The plus ( + ) operator configures the function to keep the fields that you specified in the Field list field and drop the rest.
- In the Output Fields panel, click Update and review the list of fields to confirm that the unwanted fields are dropped from your records.
- Flatten each record that contains multiple nested records into individual records.
- On the pipeline canvas, click the + icon to the right of the Fields function, then select MV Expand from the function picker.
- On the View Configurations tab, enter body in the field field, and then enter 0 in the limit field. The MV Expand function flattens all the values within the named field and carries all other fields into each newly created record.
- (Optional) Click the Start Preview button, then click the MV Expand function to confirm that the nested records are expanded properly. The
body
field of each record now contains only one map of metrics name and value.
Expand this section to see an example of what one of your records might look like in the List view after the body field is transformed.
{ metricsRange: [ 2, 5 ], body: [ { value: "9997", name: "destPort" }, { value: "9991", name: "sourcePort" } ], range: [ 0, 1, 2, 3, 4, 5, 6 ], dimensionsRange: [ 0, 1, 3, 4, 6 ], dimensions: { destIp: "52.88.24.27", destHost: "forwarders.scs.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, nanos: 0, kind: "event", host: "C02X60K4JGH5", source_type: "splunkd", attributes: { arbitrary_fields: { _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", _channel: "2910" }, spl_forwarder_channel_info: { forwarder_src_ip: "C02X60K4JGH5", forwarder_src_port: 9991, forwarder_channel_id: 3 }, _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb", _splunk_connection_id: "forwarders:all", spl_fwd_type: "uf", spl_flags: 565, index: "_internal", spl_stmid: { offset: 0, suboffset: 0, strmid: 0 } }, _rule: { _rule_id: "GROK_TIMESTAMP", _rule_description: "Grok based timestamp patterns", _metadata: { _pattern: "%{DATESTAMP}" } }, id: "", source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", timestamp: 1597422342046, keys: [ "destHost", "destIp", "destPort", "eventType", "publisher", "sourcePort", "statusee" ], values: [ "forwarders.scs.splunk.com", "52.88.24.27", "9997", "connect_try", "tcpout", "9991", "TcpOutputProcessor" ] }
Expand this section to see an example of what one of your records might look like in the List view after the record is flattened and unnecessary fields have been dropped.
{ body: { value: "9991", name: "sourcePort" }, dimensions: { destIp: "52.88.24.27", destHost: "forwarders.scs.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, timestamp: 1597422342046 }
Step 4: Send your transformed data to Splunk Infrastructure Monitoring
Configure the Send to Splunk Infrastructure Monitoring sink function to map the relevant fields from your DSP records to the name
, value
, dimension
, and timestamp
fields that are supported in the Splunk Infrastructure Monitoring metrics schema.
Steps
- Add the Send to Splunk Infrastructure Monitoring sink function to the end of your pipeline and configure it.
- On the pipeline canvas, click the + icon to the right of the MV Expand function, then select Send to Splunk Infrastructure Monitoring from the function picker.
- On the View Configurations tab, set the connection_id field to the ID of your Splunk Observability connection.
- Enter the following expressions and input in the corresponding fields:
Field Supported data type Expression or input metric_name String map_get(body, "name")
metric_value Double cast(map_get(body, "value"), "double")
metric_type String "GAUGE" metric_timestamp (optional) Long timestamp
metric_dimensions (optional) map<string, string> dimensions
- To save your pipeline, click Save.
- To start sending your data, click Activate Pipeline and then click Activate. If you're activating your pipeline for the first time, don't select Skip Restore State or Allow Non-Restored State. See Using activation checkpoints to activate your pipeline in the Use the manual for more details.
Depending on the size and complexity of the pipeline, it might take some time for the pipeline to finish activating. Before continuing to the next step, wait for the status beside the pipeline name to update to "Activated". Additionally, make sure that all the functions in the pipeline display metrics indicating that data is flowing through the pipeline. - To see if your data is successfully sent to Splunk Infrastructure Monitoring, see Planning and Creating Charts in the Splunk Infrastructure Monitoring documentation for details on how to view your data.
You've successfully transformed your metrics.log
data and sent it to Splunk Infrastructure Monitoring through the .
As an alternative to building your pipeline using the Canvas View, you can use the SPL View instead. The following is the full SPL2 expression for the pipeline described on this page. Replace the "YOUR_OBSERVABILITY_CONNECTION_ID"
placeholder value with the connection ID of your Splunk Observability connection.
| from forwarders("forwarders:all") | where source_type="splunkd" AND match_wildcard(source, "*metrics.log") | apply_line_breaking line_breaker="([\\r\\n]+)" truncate=10000 linebreak_type="advanced" | apply_timestamp_extraction fallback_to_auto=false extraction_type="auto" | eval body=cast(body, "string") | rex max_match=20 field=body /(?<keys>\S+)=(?<values>[^,]+),?/ | eval range=mvrange(0, length(keys)), dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL), dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x))), dimensions=map_merge(dimensions), dimensions=ucast(dimensions, "map<string, string>", null) | eval metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL), body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)}) | fields + body, dimensions, timestamp | mvexpand limit=0 body | into into_signalfx("YOUR_OBSERVABILITY_CONNECTION_ID", map_get(body, "name"), cast(map_get(body, "value"), "double"), "GAUGE", timestamp, dimensions);
See also
Create a DSP connection to Splunk Observability | HTTP Event Collector and the |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5
Feedback submitted, thanks!