Skip to main content

Distributed Aggregations

A distributed aggregation allows you to partially process aggregations in different shards. This allows a stream worker in one shard to be responsible only for processing a part of the aggregation.


CREATE AGGREGATION <aggregator name> WITH (store.type='database', store.replication.type='global', PartitionById.enable='false')
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
from <input stream>
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;


@partitionByIdIf the property is given, then the distributed aggregation is enabled. This can be disabled by using enable element, PartitionById.enable='false'.

System Properties

System PropertyDescriptionPossible ValuesOptionalDefault Value
shardIdThe ID of the shard one of the distributed aggregation is running in. This should be unique to a single shard.Any stringNo<Empty_String>
partitionByIdThis allows user to enable/disable distributed aggregation for all aggregations running in one stream processing manager.true/falseYesfalse

ShardIds should not be changed after the first configuration in order to keep data consistency.