Splunk® Data Stream Processor

Use the Data Stream Processor

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Aggregate records in a DSP pipeline

Use the Aggregate function to perform one or more aggregation calculations on your streaming data. For each aggregation calculation that you want to perform, specify the aggregation functions, the subset of data to perform the calculation on (fields to group by), the timestamp field for windowing, and the output fields for the results.

After the given window time has passed, the aggregation function outputs the records in your data stream with the user-defined output fields, the fields to group by, and the window length that the aggregations occurred in. The aggregation function drops all other fields from the event schema.

The aggregation function has no concept of wall clock time, and the passage of time is based on the timestamps of incoming records. The aggregation function tracks the latest timestamp it received in the stream as the "current" time, and it determines the start and end of windows using this timestamp. Once the difference between the current timestamp and the start timestamp of the current window is greater than the window length, that window is closed and a new window starts. However, since events may arrive out of order, the grace period argument allows the previous window W to remain "open" for a certain period G after its closing timestamp T. Until the function receives a record with a timestamp C where C > T + G, any incoming events with timestamp less than T are counted towards the previous window W.

List of aggregation functions

You can use the following aggregation functions within the Aggregate streaming function:

  • average: Calculates the average in a time window.
  • count: Counts the number of non-null values in a time window.
  • max: Returns the max value in a time window.
  • min: Returns the min value in a time window.
  • sum: Returns the sum of values in a time window.

Count the number of times a source appeared per host

Imagine you want to count the number of times a source appeared in a given time window per host. This example does the following:

  • Use the count aggregation function to output the result to num_events_with_source_field.
  • Groups the outputted fields by host.
  • Executes the aggregations in a time-window of 60 seconds based on the timestamp of your record.
  1. From the Data Pipelines Canvas view, click the + icon and add the Aggregate function to your pipeline.
  2. In the Aggregate function, add a new Group By. In Field/Expression, type host, and host in Output Field. Click OK.
  3. In the Timestamp field, enter timestamp.
  4. From the New Aggregations drop-down list, and click Count.
  5. Type source in Field/Expression, and type num_events_with_source_field in Output Field.
  6. Click Validate. After validating, you can see what your record schema will look like after your data passes through the aggregation function.
  7. Click Start Preview and the Aggregate function to verify that your data is being aggregated. In this example, you are using the default time window of 60 seconds, so your preview data shows up after 60 seconds have passed between the timestamps of your records.

Example output

Imagine that your data stream contained the following data. "This screen image shows 6 records in a data stream: three with the host "Buttercup" and three with the host "127.0.0.1".

The aggregate function outputs records that span the given window size, and writes the results of the aggregate calculations to the num_events_with_source_field field in each record as follows:

"This screen image shows the aggregated result in your data stream, where the value of host is being counted."

Last modified on 06 December, 2019
Deserialize and send Azure Event Hubs data from a DSP pipeline   Adding, removing, and updating fields with DSP

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.0


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters