Create Named Aggregations
A named aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.
This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and for further processing. Its schema is defined via the aggregation definition.
You can also create Distributed Aggregations or Joins.
A named aggregation allows you to retrieve the aggregate values for different time durations.
That is, it allows you to obtain aggregates such as
of stream attributes for durations such as
hour, and so on.
This is of considerable importance in many analytics scenarios, because aggregate values are often needed for several time periods. This also ensures that the aggregations are not lost due to unexpected system failures, because aggregates can be stored in different persistence stores.
CREATE AGGREGATION <aggregator name> WITH (store.type='<store type>', store.replication.type='<global or local'>, purge.enable='<true or false>', purge.interval='<purging interval>', purge.retention.period='<retention period>')
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
from <input stream>
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;
|store||Used to refer to the data store where the calculated aggregate results are stored. This annotation is optional. When no annotation is provided, the data is stored in the |
|purge||Used to configure purging in aggregation granularities. If this annotation is not provided, then the default purging is applied. To disable automatic data purging, you can use this annotation as follows:|
|purge.retention.Period||Used to specify the length of time the data needs to be retained when carrying out data purging. If this annotation is not provided, then the default retention period is applied.|
|aggregator name||Specifies a unique name for the aggregation so that it can be referred when accessing aggregate results.|
|input stream||The stream that feeds the aggregation.|
|group by attribute name||The |
|by timestamp attribute||This clause is optional. This defines the attribute that should be used as the timestamp. If this clause is not used, then the event time is used by default. The timestamp could be given as either a |
|time periods||Time periods can be specified as a range where the minimum and the maximum value are separated by three dots, or as comma-separated values. For example, a range can be specified as sec...year where aggregation is done per second, minute, hour, day, month, and year. Comma-separated values can be specified as min, hour.|
Granularity and Retention
Aggregation granularity data holders are automatically purged every 15 minutes. When carrying out data purging, the retention period you have specified for each granularity in the named aggregation query is taken into account.
The retention period defined for a granularity needs to be greater than or equal to its minimum retention period as specified in the table below. If no valid retention period is defined for a granularity, then the default retention period is applied.
|Granularity||Default retention||Minimum retention|
Aggregation is carried out at calendar start times for each granularity with the GMT timezone
The aggregation input stream should only feed events to one aggregation definition.
CREATE STREAM TradeStream (symbol string, price double, volume long, timestamp long);
CREATE AGGREGATION TradeAggregation WITH (store.type='database', store.replication.type='global', purge.enable='true', purge.interval='10 sec', purge.retentionPeriod.sec='120 sec', purge.retentionPeriod.min='24 hours', purge.retentionPeriod.hours='30 days', purge.retentionPeriod.days='1 year', purge.retentionPeriod.months='all', purge.retentionPeriod.years='all')
select symbol, avg(price) as avgPrice, sum(volume) as total
group by symbol
aggregate by timestamp every sec ... year;
This stream worker defines an aggregation named
TradeAggregation to calculate the average for the
price attribute and sum for the
volume attribute of events arriving at the
TradeStream stream. These aggregates are calculated per every time granularity in the second-year range.