Splunk® Data Stream Processor

Function Reference

On October 30, 2022, all 1.2.x versions of the Splunk Data Stream Processor will reach its end of support date. See the Splunk Software Support Policy for details. For information about upgrading to a supported version, see the Upgrade the Splunk Data Stream Processor topic.

Aggregate with Trigger

This topic describes how to use the function in the .

Description

The Aggregate with Trigger function applies one or more aggregation functions on a stream of records in a specified time window, and outputs a record when a pre-defined condition is met. Use this function to set an alert on your data before your data gets indexed. This function accepts a variable number of aggregation functions.

Function Input/Output Schema

Function Input
collection<record<R>>
This function takes in collections of records with schema R.
Function Output
collection<record<S>>
This function outputs the same collection of records but with a different schema S.

Syntax

The required syntax is in bold.

aggregate_and_trigger
aggregations(field) [AS <newfield>]
keys=<field> ["," <field>] [AS <newfield>]
timestamp=<field>
size=<long>
slide=<long>
grace_period=<long>
trigger_count=<long>
trigger_interval=<long>
trigger_time_type=<trigger_type_options>
predicate=<boolean-expression>
trigger_max_fire_count=<long>
custom_fields=<expression> AS <newfield>

Required arguments

aggregations
Syntax: aggregations=collection<expression<any>>
Description: An aggregation function to apply on your events.
keys
Syntax: <field>
Description: The field values by which to group events.
Example in Canvas View: body
timestamp
Syntax: <field>
Description: The field name where your record's timestamps are located.
Example in Canvas View: timestamp or get("timestamp");
size
Syntax: <long>
Description: The window length in milliseconds to group events.
Example in Canvas View: 10000
slide
Syntax: <long>
Description: The rolling window time offset.
Example in Canvas View: 360000
grace_period
Syntax: <long>
Description: The amount of time, in milliseconds, to wait for late-arriving events. In some cases, you may have some events that arrive after the latest time window boundary. This setting allows you to specify an amount of time to wait for any late-arriving events for the time window. If specified, this argument affects when the windows close. For example, if you have a window of 1 hour (10:00:00AM - 11:00:00AM) with a grace period of 5 minutes, then the window won't close until the pipeline receives an event with timestamp >= 11:05:00AM. When you assign a grace period, the pipeline tolerates out of order events. See the How are time windows calculated? section for a more detailed explanation about how time windows are determined and how the grace period argument is used.
Example in Canvas View: 10000.
trigger_count
Syntax: <long>
Description: Trigger an event based on the count of aggregated events. For example, if set to 1, then a trigger is fired after every one event. Set to zero to only trigger events at the close of the window. The trigger count is reset after each window.
Example in Canvas View: 1
trigger_interval
Syntax: <long>
Description: Trigger an event after a certain amount of time has passed since the start of the window. For example, setting this to 3600000 will cause a trigger to fire after 3600000 milliseconds (1 hour) has elapsed since the start of the window. The behavior of this interval depends on the trigger time type. Set to zero to only trigger events at the close of the window.
Example in Canvas View: 0
trigger_time_type
Syntax: EventTime or ProcessingTime
Description: Determines the time when an event is triggered. Select EventTime to use the timestamp field as the measurement of time. Select ProcessingTime to use the system clock of the machine running the job. See the Trigger Time Type Options section for an example.
predicate
Syntax: <boolean-expression>
Description: A boolean scalar function that evaluates events and triggers an alert if the condition is true per aggregated event.
Example in Canvas View: total_time_taken>1000L
trigger_max_fire_count
Syntax: <long>
Description: The maximum number of times that the predicate evaluates to true per aggregated record. Once met, no more records will be outputted for that window.
Example in Canvas View: 1
custom_fields
Syntax: collection<expression<any>>
Description: A function that runs when a triggered record passes the predicate.
Example in Canvas View: custom_fields=["triggered because status_code = 500 has total_time_taken > 1000" AS message]

If both trigger count and trigger interval are set to positive numbers, whichever occurs first will cause the subsequent triggered event to happen.

Trigger Time Type Options

For example, assume you have two events: one with a timestamp of 1PM and another event with a timestamp of 2PM and your trigger interval is 1 hour. In this example, due to network latency, both events are received in close succession at 2:01PM. In EventTime, an event is triggered because the difference in the event timestamp is greater than the trigger interval of one hour. In ProcessingTime, because the system processes both events within the given trigger interval, no event is triggered.

Alternatively, assume you have two events: one with a timestamp of 1PM and one with a timestamp of 1:01PM. Due to network latency, the second event is received at 2PM. In EventTime, an event is not triggered because the difference in the event timestamp is not greater than the trigger interval. In ProcessingTime, the event is triggered, because the system took longer than the trigger interval to process both events.

EventTime uses the same time unit as the timestamp field. ProcessingTime uses milliseconds.

Usage

The Aggregate with Trigger function has no concept of wall clock time, and the passage of time is based on the timestamps of incoming records. The Aggregate with Trigger function tracks the latest timestamp it received in the stream as the "current" time, and it determines the start and end of windows using this timestamp. Once the difference between the current timestamp and the start timestamp of the current window is greater than the window length, that window is closed and a new window starts.

However, since records may arrive out of order, the grace period argument allows the previous window W to remain "open" for a certain period G after its closing timestamp T. Until the Aggregate with Trigger function receives a record with a timestamp C where C > T + G, any incoming records with timestamp less than T are counted towards the previous window W. Once a record with timestamp > C is received, the window is closed and a new window is opened. If the Aggregate with Trigger function never receives a record with timestamp >= C, then the window will remain open.

To illustrate this, consider the following example where you have a sliding window with a window size of 1 hour (10:00:00AM - 11:00:00AM) and two events that have timestamps 11:01:00AM and 10:59:00AM respectively. The event with timestamp 10:59:00AM arrives after the event with timestamp 11:01:00AM.

  • If the grace-period is set to 0, then the pipeline does not tolerate out of order data. When the event with timestamp 11:01:00AM arrives, the window will close and output results. The event with timestamp 10:59:00AM is dropped.
  • If the grace-period is set to 5 minutes, then the pipeline tolerates out of order data. Events with timestamps up to 5 minutes past the window are permitted. When the event with timestamp 11:01:00AM arrives, the window remains open and the event with timestamp 11:01:00AM is added to the next window: 11:00:00AM - 12:00:00PM. The event with timestamp 10:59:00AM then arrives and is added to the window 10:00:00AM - 11:00:00AM. The 10:00:00AM - 11:00:00AM window closes and outputs results when an event with timestamp >=11:05:00 arrives.

Example

An example of a common use case follows. These examples assume that you have added the function to your pipeline.

SPL2 Example: Count the number of errors within a 1 hour time window and output an alert as soon as five errors are seen.

This example assumes that you are in the SPL View.

...| aggregate_and_trigger keys=[host] timestamp=timestamp size=3600000 slide=3600000 grace_period=0 trigger_count=10 trigger_interval=0 trigger_time_type="EventTime" aggregations=[count(cast(map_get(attributes, "error"), "integer")) AS error_code] predicate=error_code>5 trigger_max_fire_count=1 custom_fields=["triggered because error-count > 5" AS action] |...
Last modified on 26 October, 2021
Adaptive Thresholding (beta)   Apply Line Break

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.1.0, 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters