Aggregate with Trigger
This topic describes how to use the function in the .
Description
The Aggregate with Trigger function applies one or more aggregation functions on a stream of records in a specified time window, and outputs a record when a pre-defined condition is met. Use this function to set an alert on your data before your data gets indexed. This function accepts a variable number of aggregation functions.
Function Input/Output Schema
- Function Input
- collection<record<R>>
- This function takes in collections of records with schema R.
- Function Output
- collection<record<S>>
- This function outputs the same collection of records but with a different schema S.
Syntax
The required syntax is in bold.
- aggregate_and_trigger
- aggregations(field) [AS <newfield>]
- keys=<field> ["," <field>] [AS <newfield>]
- timestamp=<field>
- size=<long>
- slide=<long>
- grace_period=<long>
- trigger_count=<long>
- trigger_interval=<long>
- trigger_time_type=<trigger_type_options>
- predicate=<boolean-expression>
- trigger_max_fire_count=<long>
- custom_fields=<expression> AS <newfield>
Required arguments
- aggregations
- Syntax: aggregations=collection<expression<any>>
- Description: An aggregation function to apply on your events.
- keys
- Syntax: <field>
- Description: The field values by which to group events.
- Example in Canvas View: body
- timestamp
- Syntax: <field>
- Description: The field name where your record's timestamps are located.
- Example in Canvas View: timestamp or get("timestamp");
- size
- Syntax: <long>
- Description: The window length in milliseconds to group events.
- Example in Canvas View: 10000
- slide
- Syntax: <long>
- Description: The rolling window time offset.
- Example in Canvas View: 360000
- grace_period
- Syntax: <long>
- Description: The amount of time, in milliseconds, to wait for late-arriving events. In some cases, you may have some events that arrive after the latest time window boundary. This setting allows you to specify an amount of time to wait for any late-arriving events for the time window. If specified, this argument affects when the windows close. For example, if you have a window of 1 hour (10:00:00AM - 11:00:00AM) with a grace period of 5 minutes, then the window won't close until the pipeline receives an event with timestamp >= 11:05:00AM. When you assign a grace period, the pipeline tolerates out of order events. See the How are time windows calculated? section for a more detailed explanation about how time windows are determined and how the grace period argument is used.
- Example in Canvas View: 10000.
- trigger_count
- Syntax: <long>
- Description: Trigger an event based on the count of aggregated events. For example, if set to 1, then a trigger is fired after every one event. Set to zero to only trigger events at the close of the window. The trigger count is reset after each window.
- Example in Canvas View: 1
- trigger_interval
- Syntax: <long>
- Description: Trigger an event after a certain amount of time has passed since the start of the window. For example, setting this to 3600000 will cause a trigger to fire after 3600000 milliseconds (1 hour) has elapsed since the start of the window. The behavior of this interval depends on the trigger time type. Set to zero to only trigger events at the close of the window.
- Example in Canvas View: 0
- trigger_time_type
- Syntax: EventTime or ProcessingTime
- Description: Determines the time when an event is triggered. Select EventTime to use the timestamp field as the measurement of time. Select ProcessingTime to use the system clock of the machine running the job. See the Trigger Time Type Options section for an example.
- predicate
- Syntax: <boolean-expression>
- Description: A boolean scalar function that evaluates events and triggers an alert if the condition is true per aggregated event.
- Example in Canvas View: total_time_taken>1000L
- trigger_max_fire_count
- Syntax: <long>
- Description: The maximum number of times that the predicate evaluates to true per aggregated record. Once met, no more records will be outputted for that window.
- Example in Canvas View: 1
- custom_fields
- Syntax: collection<expression<any>>
- Description: A function that runs when a triggered record passes the predicate.
- Example in Canvas View: custom_fields=["triggered because status_code = 500 has total_time_taken > 1000" AS message]
If both trigger count and trigger interval are set to positive numbers, whichever occurs first will cause the subsequent triggered event to happen.
Trigger Time Type Options
For example, assume you have two events: one with a timestamp of 1PM and another event with a timestamp of 2PM and your trigger interval is 1 hour. In this example, due to network latency, both events are received in close succession at 2:01PM. In EventTime, an event is triggered because the difference in the event timestamp is greater than the trigger interval of one hour. In ProcessingTime, because the system processes both events within the given trigger interval, no event is triggered.
Alternatively, assume you have two events: one with a timestamp of 1PM and one with a timestamp of 1:01PM. Due to network latency, the second event is received at 2PM. In EventTime, an event is not triggered because the difference in the event timestamp is not greater than the trigger interval. In ProcessingTime, the event is triggered, because the system took longer than the trigger interval to process both events.
EventTime uses the same time unit as the timestamp field. ProcessingTime uses milliseconds.
Usage
The Aggregate with Trigger function has no concept of wall clock time, and the passage of time is based on the timestamps of incoming records. The Aggregate with Trigger function tracks the latest timestamp it received in the stream as the "current" time, and it determines the start and end of windows using this timestamp. Once the difference between the current timestamp and the start timestamp of the current window is greater than the window length, that window is closed and a new window starts.
However, since records may arrive out of order, the grace period argument allows the previous window W to remain "open" for a certain period G after its closing timestamp T. Until the Aggregate with Trigger function receives a record with a timestamp C where C > T + G, any incoming records with timestamp less than T are counted towards the previous window W. Once a record with timestamp > C is received, the window is closed and a new window is opened. If the Aggregate with Trigger function never receives a record with timestamp >= C, then the window will remain open.
To illustrate this, consider the following example where you have a sliding window with a window size of 1 hour (10:00:00AM - 11:00:00AM) and two events that have timestamps 11:01:00AM and 10:59:00AM respectively. The event with timestamp 10:59:00AM arrives after the event with timestamp 11:01:00AM.
- If the grace-period is set to 0, then the pipeline does not tolerate out of order data. When the event with timestamp 11:01:00AM arrives, the window will close and output results. The event with timestamp 10:59:00AM is dropped.
- If the grace-period is set to 5 minutes, then the pipeline tolerates out of order data. Events with timestamps up to 5 minutes past the window are permitted. When the event with timestamp 11:01:00AM arrives, the window remains open and the event with timestamp 11:01:00AM is added to the next window: 11:00:00AM - 12:00:00PM. The event with timestamp 10:59:00AM then arrives and is added to the window 10:00:00AM - 11:00:00AM. The 10:00:00AM - 11:00:00AM window closes and outputs results when an event with timestamp >=11:05:00 arrives.
Example
An example of a common use case follows. These examples assume that you have added the function to your pipeline.
SPL2 Example: Count the number of errors within a 1 hour time window and output an alert as soon as five errors are seen.
This example assumes that you are in the SPL View.
...| aggregate_and_trigger keys=[host] timestamp=timestamp size=3600000 slide=3600000 grace_period=0 trigger_count=10 trigger_interval=0 trigger_time_type="EventTime" aggregations=[count(cast(map_get(attributes, "error"), "integer")) AS error_code] predicate=error_code>5 trigger_max_fire_count=1 custom_fields=["triggered because error-count > 5" AS action] |...
Structure of DSP function descriptions | Batch Bytes |
This documentation applies to the following versions of Splunk® Data Stream Processor: 1.1.0, 1.2.0, 1.2.1-patch02, 1.2.1, 1.2.2-patch02, 1.2.4, 1.2.5, 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6
Feedback submitted, thanks!