Splunk® Data Stream Processor

DSP Function Reference

On April 3, 2023, Splunk Data Stream Processor will reach its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.
This documentation does not apply to the most recent version of Splunk® Data Stream Processor. For documentation on the most recent version, go to the latest release.

Aggregate and Trigger

Applies one or more aggregation functions on a stream of records in a specified time window, and outputs an event when a pre-defined condition is met. Use this function to set an alert on your data before your data gets indexed. This function accepts a variable number of aggregation functions.

Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Function Output
collection<record<S>>
This function outputs the same collection of records but with a different schema S.

Note: The aggregation function has no concept of wall clock time, and the passage of time is based on the timestamps of incoming records. The aggregation function tracks the latest timestamp it received in the stream as the "current" time, and it determines the start and end of windows using this timestamp. Once the difference between the current timestamp and the start timestamp of the current window is greater than the window length, that window is closed and a new window starts. However, since events may arrive out of order, the grace period argument allows the previous window W to remain "open" for a certain period G after its closing timestamp T. Until we receive a record with a timestamp C where C > T + G, any incoming events with timestamp less than T are counted towards the previous window W.

Arguments

Argument Input Description UI example
by collection<expression<any>> The name of one or more fields to group by. field/expression: host, output field: host
timestamp expression<long> The field name in which your record's timestamps are located. timestamp
size (window length) long The window length in milliseconds to group events. 60 seconds
slide (window length) long The amount of time, in milliseconds, to wait before starting a new window. 60 seconds
grace period long The amount of time, in milliseconds, to wait for late-arriving events. 10 seconds
trigger count long Trigger an event based on the count of aggregated events. For example, if set to 1, then a trigger is fired after every one event. Set to zero to only trigger events at the close of the window. The trigger count is reset after each window. 1
trigger interval long Trigger an event after a certain amount of time has passed since the start of the window. For example, setting this to 3600000 will cause a trigger to fire after 3600000 milliseconds has elapsed since the start of the window. The behavior of this interval depends on the trigger time type. Set to zero to only trigger events at the close of the window. 0
trigger time type EventTime or ProcessingTime Determines the time when an event is triggered. Select EventTime to use the timestamp field as the measurement of time. Select ProcessingTime to use the system clock of the machine running the job.

For example, say if you have two events: one with a timestamp of 1PM and another event with a timestamp of 2PM and your trigger interval is 1 hour. In this example, due to network latency, both events are received in close succession at 2:01PM. In EventTime, an event is triggered because the difference in the event timestamp is greater than the trigger interval of one hour. In ProcessingTime, because the system processes both events within the given trigger interval, no event is triggered.

Alternatively, say if you have two events: one with a timestamp of 1PM and one with a timestamp of 1:01PM. Due to network latency, the second event is received at 2PM. In EventTime, an event is not triggered because the difference in the event timestamp is not greater than the trigger interval. In ProcessingTime, the event is triggered, because the system took longer than the trigger interval to process both events.

Note: EventTime uses the same time unit as the timestamp field. ProcessingTime uses milliseconds.
EventTime
aggregations collection<expression<any>> Apply one or more aggregation functions. New Aggregation: count, field/expression: map_get(get("attributes"), "error"), output field: error-count
predicate expression<boolean> A special filter function that evaluates events and triggers an alert if the condition is true per aggregated event. Trigger Conditions: error-count, <, 5
trigger max fire count long The maximum number of times that the predicate evaluates to true per aggregated event. Once met, no more events will be outputted for that window. 1
custom fields collection<expression<any>> An eval function that runs when a triggered event passes the filter predicate. Expression/Original Field: create_map("email", "alert@splunk.com"), output field: action

If both trigger count and trigger interval are set to positive numbers, whichever occurs first will cause the subsequent triggered event to happen.

DSL example

This example counts the number of errors within a 1 hour time window, and outputs an alert as soon as five errors are seen.

aggregated = aggregate_and_trigger(input,
by: as(get("host"), "host"),
timestamp: get("timestamp"),
size: 3600000,
slide: 3600000,
grace-period: 0L,
trigger-count: 10L,
trigger-interval: 0,
trigger-time-type: "EventTime",
aggregations: as(count(map_get(get("attributes"), "error")), "error-count"),
predicate: gt(get("error-count"), 5), trigger-max-fire-count: 1,
custom-fields: (
   as(create_map("email", "alert@splunk.com"), "action")));
Last modified on 31 October, 2019
Aggregate   Batch Events

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.0.0


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters