Skip to main content

Create Named Aggregations

A named aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.

This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and for further processing. Its schema is defined via the aggregation definition.

You can also create Distributed Aggregations or Joins.

Purpose

A named aggregation allows you to retrieve the aggregate values for different time durations. That is, it allows you to obtain aggregates such as sum, count, avg, min, max, and distinctCount of stream attributes for durations such as sec, min, hour, and so on.

This is of considerable importance in many analytics scenarios, because aggregate values are often needed for several time periods. This also ensures that the aggregations are not lost due to unexpected system failures, because aggregates can be stored in different persistence stores.

Syntax

CREATE AGGREGATION <aggregator name> WITH (store.type='<store type>', store.replication.type='<global or local'>, purge.enable='<true or false>', purge.interval='<purging interval>', purge.retention.period='<retention period>')
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
from <input stream>
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;

Parameters

ItemDescription
storeUsed to refer to the data store where the calculated aggregate results are stored. This annotation is optional. When no annotation is provided, the data is stored in the in-memory store.
purgeUsed to configure purging in aggregation granularities. If this annotation is not provided, then the default purging is applied. To disable automatic data purging, you can use this annotation as follows:purge.enable='false' You should disable data purging if the aggregation query in included in the stream worker for read-only purposes.
purge.retention.PeriodUsed to specify the length of time the data needs to be retained when carrying out data purging. If this annotation is not provided, then the default retention period is applied.
aggregator nameSpecifies a unique name for the aggregation so that it can be referred when accessing aggregate results.
input streamThe stream that feeds the aggregation.
group by attribute nameThe group by clause is optional. If it is included in a stream worker, then aggregate values are calculated per each group by attribute. If it is not used, then all the events are aggregated together.
by timestamp attributeThis clause is optional. This defines the attribute that should be used as the timestamp. If this clause is not used, then the event time is used by default. The timestamp could be given as either a string or a long value. If it is a long value, then the unix timestamp in milliseconds is expected (e.g. 1496289950000). If it is a string value, then the supported formats are <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> (if time is in GMT) and <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z> (if time is not in GMT), here the ISO 8601 UTC offset must be provided for <Z> .(e.g., +05:30, -11:00).
time periodsTime periods can be specified as a range where the minimum and the maximum value are separated by three dots, or as comma-separated values. For example, a range can be specified as sec...year where aggregation is done per second, minute, hour, day, month, and year. Comma-separated values can be specified as min, hour.

Granularity and Retention

Aggregation granularity data holders are automatically purged every 15 minutes. When carrying out data purging, the retention period you have specified for each granularity in the named aggregation query is taken into account.

The retention period defined for a granularity needs to be greater than or equal to its minimum retention period as specified in the table below. If no valid retention period is defined for a granularity, then the default retention period is applied.

GranularityDefault retentionMinimum retention
second120 seconds120 seconds
minute24 hours120 minutes
hour30 days25 hours
day1 year32 days
monthAll13 month
yearAllnone
note

Aggregation is carried out at calendar start times for each granularity with the GMT timezone

note

The aggregation input stream should only feed events to one aggregation definition.

Example

CREATE STREAM TradeStream (symbol string, price double, volume long, timestamp long);

CREATE AGGREGATION TradeAggregation WITH (store.type='database', store.replication.type='global', purge.enable='true', purge.interval='10 sec', purge.retentionPeriod.sec='120 sec', purge.retentionPeriod.min='24 hours', purge.retentionPeriod.hours='30 days', purge.retentionPeriod.days='1 year', purge.retentionPeriod.months='all', purge.retentionPeriod.years='all')
select symbol, avg(price) as avgPrice, sum(volume) as total
from TradeStream
group by symbol
aggregate by timestamp every sec ... year;

This stream worker defines an aggregation named TradeAggregation to calculate the average for the price attribute and sum for the volume attribute of events arriving at the TradeStream stream. These aggregates are calculated per every time granularity in the second-year range.