On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details.
Formatting data into the SignalFx metrics schema
When you create a data pipeline in the to send data to a SignalFx endpoint using the Send Metrics Data to SignalFx sink function, you must make sure that the data is compatible with the SignalFx metrics schema. Search for "The SignalFx Data Model" in the SignalFx Developers Guide for more information.
The following diagram shows how your DSP metrics are transformed to be compatible with the SignalFx metrics schema.
In this case, the specific dimensions
and type
values of the record override the defaultDimensions
and defaultType
values.
Example: Send metrics.log data from the Splunk universal forwarder to SignalFx
The Splunk universal forwarder logs metrics.log data by default. Using DSP, you can process and send metrics.log data to SignalFx to troubleshoot your inputs and review product behavior. See About metrics.log in the Splunk Enterprise Troubleshooting Manual for more details. This example shows you how to do the following tasks:
- Split the incoming stream of metrics.log data into individual records.
- Extract the timestamp from the event body and use the extracted value as the timestamp of the event itself.
- Extract nested maps in your metrics.log data and assign the extracted data to top-level fields.
- Expand multivalue fields and drop fields from your records to make them compatible with the SignalFx metrics schema.
- Send your transformed data to SignalFx.
Prerequisites
- A SignalFx connection. See Create a DSP connection to SignalFx.
- A properly configured Splunk universal forwarder. See Create a DSP connection to a Splunk forwarder.
Best practices
Follow these guidelines to get a better experience building your SignalFx metrics pipeline in DSP:
- Rename your functions to keep track of their roles because you might use multiple instances of the same function in your pipeline.
- When previewing the data, switch to the List view for better layout of the JSON format:
- In the Preview Results tab, on the navigation bar, click Table.
- Select List from the drop-down menu.
Steps
Log data can be converted to metrics in many different ways. The data extraction approaches in steps 7 and 8 are examples particular to metrics.log data from a particular Splunk universal forwarder, not best practices that apply to all scenarios. Adjust your pipeline according to the format of your own records for accurate data transformation.
- In DSP, click Build Pipeline.
- Select Splunk DSP Firehose as your source function.
- Data from multiple log files come into DSP when using Splunk DSP Firehose as the data source, so you need to filter out metrics.log data.
- On the pipeline canvas, click the + icon next to the Splunk DSP Firehose function, then select Where from the function picker.
- On the View Configurations tab, enter the following SPL2 expression in the predicate field:
This expression keeps only records with
source_type="splunkd" AND match_wildcard(source, "*metrics.log")
source_type
that matchsplunkd
and havemetrics.log
as thesource
. - Click Start Preview, then click the Where function to confirm that your records have been filtered properly.
- The Splunk universal forwarder sends data in 64-kilobyte blocks. You need to split and stitch your events together before you can perform further transformations on them.
- On the pipeline canvas, click the + icon to the right of the Where function, then select Apply line break from the function picker.
- On the View Configurations tab, click the Break type drop-down menu, then select Advanced.
- On the View Configurations tab, enter the following regular expression in the Regex field:
([\r\n]+)
This expression breaks data into an event for each line delimited by return (\r) or newline (\n) characters. - Click Start Preview, then click the Apply line break function to confirm that your data stream has been split properly.
- When DSP receives data from the universal forwarder, the timestamp used is the time when the event is ingested, not the actual time at which the event is recorded. You need to reassign the actual time to the
timestamp
field.- On the pipeline canvas, click the + icon to the right of the Apply line break function, then select Apply Timestamp Extraction from the function picker.
- On the View Configurations tab, make sure that Auto is selected.
- Click Start Preview, then click the Apply Timestamp Extraction function to make sure the timestamps are extracted and assigned correctly to the
timestamp
field in your records.
- Before you can extract key-value pairs from the
body
field to work with them, you need to castbody
to type string.- On the pipeline canvas, click the + icon to the right of the Apply Timestamp Extraction function, then select Eval from the function picker.
- On the View Configurations tab, enter the following SPL2 expression:
body=cast(body, "string")
- On the Output Fields tab, click Update to confirm that the
body
field is now of type string.
- Extract key-value pairs from the
body
field to create maps for thedimensions
field.- On the pipeline canvas, click the + icon to the right of the Eval function, then select Parse with regex from the function picker.
- On the View Configurations tab, enter body in the field field.
- On the View Configurations tab, enter the following SPL2 expression in the pattern field:
/(?<keys>\S+)=(?<values>[^,]+),?/
A list of all the strings appearing before an equal sign ( = ) are extracted and assigned to a newly createdkeys
field. A list of all the strings appearing after an equal sign ( = ) are extracted and assigned to a newly createdvalues
field. - On the View Configurations tab, enter a max match limit in the max_match field. See Rex in the Functions Reference manual for more details.
- Click Start Preview, then click the Parse with regex function to confirm that the new
keys
andvalues
fields are created and populated correctly with the keys and values extracted from thebody
field.
- Extract dimensions data points from the
body
field to create the dimensions map.- On the pipeline canvas, click the + icon to the right of the Parse with regex function, then select Eval from the function picker.
- On the View Configurations tab, click + Add four times to create four more blank fields.
- On the View Configurations tab, enter each of the following SPL2 expressions in a separate function field in this order:
SPL2 expression Descriptions range=mvrange(0, length(keys))
Get the list of indices corresponding to elements in list keys
to use as iterators for listvalues
.dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL)
Iterate through list values
and filter out indices of elements that don't parse as double. These elements are non-numeric values, which mean they can be used as dimension data points of the records in this case.The extraction of dimensions data based on non-numeric values might or might not be accurate for all data types, so you filter your dimensions based on the format of your records.
dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x)))
Create a list of dimension maps using the iterators in dimensionsRange
and assign it to a newdimensions
field.dimensions=map_merge(dimensions)
Merge the list of dimension maps into a single map. dimensions = ucast(dimensions, "map<string, string>", null)
Cast the dimensions
field to type map<string, string>. - On the Output Fields tab, click Update.
- Click Start Preview to confirm that the
dimensions
field is properly created. Here's an example of what your record might look like in the List view:{ range: [ 0, 1, 2, 3, 4, 5, 6 ], dimensionsRange: [ 0, 1, 3, 4, 6 ], dimensions: { destIp: "52.88.24.27", destHost: "forwarders.playground.scp.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, body: "08-14-2020 16:25:42.046 -0400 INFO StatusMgr - destHost=forwarders.playground.scp.splunk.com, destIp=52.88.24.27, destPort=9997, eventType=connect_try, publisher=tcpout, sourcePort=9991, statusee=TcpOutputProcessor", nanos: 0, kind: "event", host: "C02X60K4JGH5", source_type: "splunkd", attributes: { arbitrary_fields: { _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", _channel: "2910" }, spl_forwarder_channel_info: { forwarder_src_ip: "C02X60K4JGH5", forwarder_src_port: 9991, forwarder_channel_id: 3 }, _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb", _splunk_connection_id: "forwarders:all", spl_fwd_type: "uf", spl_flags: 565, index: "_internal", spl_stmid: { offset: 0, suboffset: 0, strmid: 0 } }, _rule: { _rule_id: "GROK_TIMESTAMP", _rule_description: "Grok based timestamp patterns", _metadata: { _pattern: "%{DATESTAMP}" } }, id: "", source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", timestamp: 1597422342046, keys: [ "destHost", "destIp", "destPort", "eventType", "publisher", "sourcePort", "statusee" ], values: [ "forwarders.playground.scp.splunk.com", "52.88.24.27", "9997", "connect_try", "tcpout", "9991", "TcpOutputProcessor" ] }
- Now that the dimensions data are extracted from the
body
field and assigned to a new field, you can transform thebody
field into a list of metrics name and value maps.- On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Eval from the function picker.
- On the View Configurations tab, click + Add to create another blank field.
- On the View Configurations tab, enter the following SPL2 expression in the function fields in this order:
SPL2 expression Description metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL)
Iterate through list values
and filter out indices of elements that parse as double. These elements are numeric values, which mean they can be used as metrics data points of the records in this case.The extraction of metrics data based on numeric values might or might not be accurate for all data types, so you filter your metrics based on the format of your records.
body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)})
Create a list of all metrics name and value maps using the iterators in metricsRange
and assign it to thebody
field. - On the Output Fields tab, click Update.
- Click Start Preview to confirm that the
body
field is properly transformed. Here's an example of what your record might look like in the List view:{ metricsRange: [ 2, 5 ], body: [ { value: "9997", name: "destPort" }, { value: "9991", name: "sourcePort" } ], range: [ 0, 1, 2, 3, 4, 5, 6 ], dimensionsRange: [ 0, 1, 3, 4, 6 ], dimensions: { destIp: "52.88.24.27", destHost: "forwarders.playground.scp.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, nanos: 0, kind: "event", host: "C02X60K4JGH5", source_type: "splunkd", attributes: { arbitrary_fields: { _path: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", _channel: "2910" }, spl_forwarder_channel_info: { forwarder_src_ip: "C02X60K4JGH5", forwarder_src_port: 9991, forwarder_channel_id: 3 }, _partition_key: "2b1266ea-3868-40ca-b01b-d25ce9e0afeb", _splunk_connection_id: "forwarders:all", spl_fwd_type: "uf", spl_flags: 565, index: "_internal", spl_stmid: { offset: 0, suboffset: 0, strmid: 0 } }, _rule: { _rule_id: "GROK_TIMESTAMP", _rule_description: "Grok based timestamp patterns", _metadata: { _pattern: "%{DATESTAMP}" } }, id: "", source: "/Applications/SplunkForwarder/var/log/splunk/metrics.log", timestamp: 1597422342046, keys: [ "destHost", "destIp", "destPort", "eventType", "publisher", "sourcePort", "statusee" ], values: [ "forwarders.playground.scp.splunk.com", "52.88.24.27", "9997", "connect_try", "tcpout", "9991", "TcpOutputProcessor" ] }
- Keep only fields compatible with the SignalFx metrics schema and drop all other fields from your records.
- On the pipeline canvas, click the + icon to the right of the latest Eval function, then select Fields from the function picker.
- On the View Configurations tab, enter body in the field_list field.
- On the View Configurations tab, click + Add twice to add two new fields, then enter dimensions and timestamp in each of the newly created fields.
- On the View Configurations tab, enter + in the operator (optional) field. Adding the plus ( + ) operator indicates that the listed fields are the only fields to be kept in the records.
- On the Output Fields tab, click Update to confirm that the unwanted fields are dropped from your records.
- Flatten each record containing multiple nested records into individual records.
- On the pipeline canvas, click the + icon to the right of the Fields function, then select MV Expand from the function picker.
- On the View Configurations tab, enter body in the field field and 0 in the limit field. The
mvexpand
function flattens all the values within the named field and carries all other fields into each newly created record. - Click Start Preview, then click the MV Expand function to confirm that the nested records are expanded properly. The
body
field of each record now contains only one map of metrics name and value. Here's an example of what your record might look like the List view:{ body: { value: "9991", name: "sourcePort" }, dimensions: { destIp: "52.88.24.27", destHost: "forwarders.playground.scp.splunk.com", publisher: "tcpout", eventType: "connect_try", statusee: "TcpOutputProcessor" }, timestamp: 1597422342046 }
- Send your transformed data to SignalFx.
- On the pipeline canvas, click the + icon to the right of the MV Expand function, then select Send Metrics Data to SignalFx from the function picker.
- On the View Configurations tab, configure the function with your SignalFx connection.
- On the View Configurations tab, enter the following SPL2 expressions and input in the corresponding fields:
Field SPL2 expression/input metric_name map_get(body, "name")
metric_value cast(map_get(body, "value"), "double")
metric_type "GAUGE" metric_timestamp (optional) timestamp
metric_dimensions (optional) dimensions
- To see if your data is successfully sent to SignalFx, see Planning and Creating Charts in the SignalFx Infrastructure Monitoring manual for details on how to view your data.
You've successfully transformed your metrics.log data and sent them to SignalFx through DSP using the Canvas Builder. Alternatively, if you want to build your pipeline using the SPL2 Builder, here's the full SPL2 expressions for the pipeline. Replace the placeholder with your own SignalFx connection ID in the sink function:
| from splunk_firehose() | where source_type="splunkd" AND match_wildcard(source, "*metrics.log") | apply_line_breaking line_breaker="([\\r\\n]+)" truncate=10000 linebreak_type="advanced" | apply_timestamp_extraction fallback_to_auto=false extraction_type="auto" | eval body=cast(body, "string") | rex max_match=20 field=body /(?<keys>\S+)=(?<values>[^,]+),?/ | eval range=mvrange(0, length(keys)), dimensionsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NULL), dimensions=for_each(iterator(dimensionsRange, "x"), create_map(mvindex(keys, x), mvindex(values, x))), dimensions=map_merge(dimensions), dimensions=ucast(dimensions, "map<string, string>", null) | eval metricsRange=filter(iterator(range, "x"), parse_double(mvindex(values, x)) IS NOT NULL), body=for_each(iterator(metricsRange, "x"), {"name": mvindex(keys, x), "value": mvindex(values, x)}) | fields + body, dimensions, timestamp | mvexpand limit=0 body | into into_signalfx("YOUR_SIGNALFX_CONNECTION_ID", map_get(body, "name"), cast(map_get(body, "value"), "double"), "GAUGE", timestamp, dimensions);
See also
- Functions
- Evals
- Fields
- Map
- Mvexpand
- List
- Related topics
- Process data from a universal forwarder in DSP
- Working with nested data
Create a DSP connection to SignalFx | HTTP Event Collector and the |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.2.0, 1.2.1-patch02
Feedback submitted, thanks!