Skip to main content

SESSION()

Holds events that belong to a session. Events that belong to a specific session are identified by a session key, and a session gap is determines the time period after which the session is considered to be expired. To have meaningful aggregation on session windows, the events need to be aggregated based on the session key via a group by clause.

Syntax

WINDOW SESSION()(<INT|LONG|TIME> session.gap)
WINDOW SESSION()(<INT|LONG|TIME> session.gap, <STRING> session.key)
WINDOW SESSION()(<INT|LONG|TIME> session.gap, <STRING> session.key, <INT|LONG|TIME> allowed.latency)

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
session.gapThe time period after which the session is considered to be expired.INT LONG TIMENoNo
session.keyThe session identification attribute. Used to group events belonging to a specific session.default-keySTRINGYesYes
allowed.latencyThe time period for which the session window is valid after the expiration of the session, to accept late event arrivals. This time period should be less than the session.gap parameter.0INT LONG TIMEYesNo

Example 1

CREATE STREAM PurchaseEventStream (user string, item_number int, price float, quantity int);
CREATE SINK STREAM OutputStream (user string, totalQuantity long, totalPrice double);

@info(name='query1')
INSERT INTO OutputStream
SELECT user, sum(quantity) AS totalQuantity, sum(price) AS totalPrice
FROM PurchaseEventStream WINDOW SESSION(5 sec, user)
GROUP BY user;

From the events arriving at the PurchaseEventStream, a session window with five seconds session gap is processed based on the user attribute as the session group identification key. All events falling into the same session are aggregated based on the user attribute and output to the OutputStream.

Example 2

CREATE STREAM PurchaseEventStream (user string, item_number int, price float, quantity int);
CREATE SINK STREAM OutputStream (user string, totalQuantity long, totalPrice double);

@info(name='query2')
INSERT INTO OutputStream
SELECT user, sum(quantity) AS totalQuantity, sum(price) AS totalPrice
FROM PurchaseEventStream WINDOW SESSION(5 sec, user, 2 sec)
GROUP BY user;

From the events arriving at the PurchaseEventStream, a session window with five seconds session gap is processed based on the user attribute as the session group identification key. This session window is kept active for two seconds after the session expiration to capture late (out-of-order) event arrivals. If the event timestamp falls in to the last session the session is reactivated. Then all events falling into the same session are aggregated based on the user attribute and output to the OutputStream.

Example 3

This example shows aggregating events over continuous activity sessions in a sliding manner.

Stream Worker Code

CREATE STREAM PurchaseStream(userId string, item string, price double);

CREATE SINK STREAM OutOfOrderUserIdPurchaseStream(userId string, totalItems long, totalPrice double);
CREATE SINK STREAM UserIdPurchaseStream(userId string, totalItems long, totalPrice double);

@info(name = 'Session-analysis')
-- Calculate count and sum of `price` per `userId` during the session.
INSERT INTO OutOfOrderUserIdPurchaseStream
SELECT userId,
count() as totalItems,
sum(price) as totalPrice
-- Aggregate events over a `userId` based session window with `1 minute` session gap.
FROM PurchaseStream WINDOW SESSION(1 min, userId)
GROUP BY userId;
-- Output when events are added to the session.

@info(name = 'Session-analysis-with-late-event-arrivals')
-- Calculate count and sum of `price` per `userId` during the session.
INSERT INTO UserIdPurchaseStream
SELECT userId,
count() AS totalItems,
sum(price) AS totalPrice
-- Aggregate events over a `userId` based session window with `1 minute` session gap,
-- and `20 seconds` of allowed latency to capture late event arrivals.
FROM PurchaseStream WINDOW SESSION(1 min, userId, 20 sec)
GROUP BY userId;
-- Output when events are added to the session.

Session Aggregation Behavior

When events are sent to PurchaseStream, the following events are emitted at UserIdPurchaseStream via the Session-analysis query, and OutOfOrderUserIdPurchaseStream via the Session-analysis-with-late-event-arrivals query.

TimeEvent TimestampInput to PurchaseStreamOutput at UserIdPurchaseStreamOutput at OutOfOrderUserIdPurchaseStream
9:00:009:00:00['1001', 'cake', 18.0]['1001', 1, 18.0]['1001', 1, 18.0]
9:00:209:00:20['1002', 'croissant', 23.0]['1002', 1, 23.0]['1002', 1, 23.0]
9:00:409:00:40['1002', 'cake', 22.0]['1002', 2, 45.0]['1002', 2, 45.0]
9:01:059:00:50['1001', 'pie', 22.0]No events, as event arrived late, and did not fall into a session.['1001', 2, 40.0]
9:01:109:01:10['1001', 'cake', 10.0]['1001', 1, 10.0]['1001', 3, 50.0]
9:01:509:01:50['1002', 'cake', 20.0]['1002', 1, 20.0]['1002', 1, 23.0]
9:02:409:02:40['1001', 'croissant', 23.0]['1001', 1, 23.0]['1001', 1, 23.0]