Skip to main content

Data Pipeline Examples

This page explains ways to create data pipelines.

Stream Joins

This example shows joining two stream based on a condition.

For more information on other join operations, refer to the Stream Worker Query Guide.

Stream Joins Example

CREATE STREAM TemperatureStream (roomNo string, temperature double);

CREATE STREAM HumidityStream (roomNo string, humidity double);

@info(name = 'Equi-join')
-- Join latest `temperature` and `humidity` events arriving within 1 minute for each `roomNo`.
INSERT INTO TemperatureHumidityStream
SELECT t.roomNo, t.temperature, h.humidity
FROM TemperatureStream window unique:time(roomNo, 1 min) AS t
JOIN HumidityStream window unique:time(roomNo, 1 min) AS h
ON t.roomNo == h.roomNo;


@info(name = 'Join-on-temperature')
INSERT INTO EnrichedTemperatureStream
SELECT t.roomNo, t.temperature, h.humidity
-- Join when events arrive in `TemperatureStream`.
FROM TemperatureStream AS t
-- When events get matched in `time()` window, all matched events are emitted, else `null` is emitted.
LEFT OUTER JOIN HumidityStream window sliding_time(1 min) AS h
ON t.roomNo == h.roomNo;

JOIN Behavior

When events are sent to TemperatureStream stream and HumidityStream stream, the following events are emitted at TemperatureHumidityStream via Equi-JOIN query, and EnrichedTemperatureStream via JOIN-on-temperature query:

TimeInput to TemperatureStreamInput to HumidityStreamOutput at TemperatureHumidityStreamOutput at EnrichedTemperatureStream
9:00:00['1001', 18.0]--['1001', 18.0, null]
9:00:10-['1002', 72.0]--
9:00:15-['1002', 73.0]--
9:00:30['1002', 22.0]-['1002', 22.0, 73.0]['1002', 22.0, 72.0],
['1002', 22.0, 73.0]
9:00:50-['1001', 60.0]['1001', 18.0, 60.0]-
9:01:10-['1001', 62.0]--
9:01:20['1001', 17.0]-['1001', 17.0, 62.0]['1001', 17.0, 60.0],
['1001', 17.0, 62.0]
9:02:10['1002', 23.5]--['1002', 23.5, null]

Partition Events by Value

This example shows partitioning events by attribute values. For more informatiON ON partition refer the Stream Query Guide.

Partition Events by Value Example

CREATE STREAM LoginStream ( userID string, loginSuccessful bool);

-- Optional purging configuratiON to remove partitiON instances that haven't received events for `1 hour` by checking every `10 sec`.
@purge(enable='true', interval='10 sec', idle.period='1 hour')
-- Partitions the events based ON `userID`.
partitiON with ( userID of LoginStream )

begin
@info(name='Aggregation-query')
-- Calculates success and failure login attempts FROM the last 3 events of each `userID`.
INSERT INTO #LoginAttempts
SELECT userID, loginSuccessful, count() AS attempts
FROM LoginStream WINDOW SLIDING_LENGTH(3)
GROUP BY loginSuccessful;
-- Inserts results to `#LoginAttempts` inner stream that is only accessible within the partitiON instance.

@info(name='Alert-query')
-- Consumes events FROM the inner stream, and suspends `userID`s that have 3 consecutive login failures.
INSERT INTO UserSuspensionStream
SELECT userID, "Three consecutive login failures!" AS message
FROM #LoginAttempts[loginSuccessful==false and attempts==3];
end;

Partition Behavior

When events are sent to LoginStream stream, following events will be generated at #LoginAttempts inner stream via Aggregation-query query, and UserSuspensionStream via Alert-query query:

Input to TemperatureStreamAt #LoginAttemptsOutput at UserSuspensionStream
['1001', false]['1001', false, 1]-
['1002', true]['1002', true, 1]-
['1002', false]['1002', false, 1]-
['1002', false]['1002', false, 2]-
['1001', false]['1001', false, 2]-
['1001', true]['1001', true, 1]-
['1001', false]['1001', false, 2]-
['1002', false]['1002', false, 2]['1002', '3 consecutive login failures!']

Scatter and Gather (String)

This example shows performing scatter and gather ON string values.

Scatter and Gather (String) Example

CREATE STREAM PurchaseStream (userId string, items string, store string);

@info(name = 'Scatter-query')
-- Scatter value of `items` in to separate events by `,`.
INSERT INTO TokenizedItemStream
SELECT userId, token AS item, store
FROM PurchaseStream#str:tokenize(items, ',', true);

@info(name = 'Transform-query')
-- Concat tokenized `item` with `store`.
INSERT INTO TransformedItemStream
SELECT userId, str:concat(store, "-", item) AS itemKey
FROM TokenizedItemStream;

@info(name = 'Gather-query')
INSERT INTO GroupedPurchaseItemStream
-- Concat all events in a batch separating them by `,`.
SELECT userId, str:groupConcat(itemKey, ",") AS itemKeys
-- Collect events traveling AS a batch via `batch()` window.
FROM TransformedItemStream window batch();

Scatter and Gather (String) Input

The following event containing a JSON string is sent to PurchaseStream:

['501', 'cake,cookie,bun,cookie', 'CA']

Scatter and Gather (String) Output

After processing, the events arrive at TokenizedItemStream:

['501', 'cake', 'CA'], ['501', 'cookie', 'CA'], ['501', 'bun', 'CA']

The events arrive at TransformedItemStream:

['501', 'CA-cake'], ['501', 'CA-cookie'], ['501', 'CA-bun']

The event arrive at GroupedPurchaseItemStream:

['501', 'CA-cake,CA-cookie,CA-bun']

Scatter and Gather (JSON)

This example shows performing scatter and gather ON JSON values.

Scatter and Gather (JSON) Example

CREATE STREAM PurchaseStream (order string, store string);

@info(name = 'Scatter-query')
-- Scatter elements under `$.order.items` in to separate events.
INSERT INTO TokenizedItemStream
SELECT json:getString(order, '$.order.id') AS orderId,
jsonElement AS item,
store
FROM PurchaseStream#json:tokenize(order, '$.order.items');


@info(name = 'Transform-query')
-- Provide `$5` discount to cakes.
INSERT INTO DiscountedItemStream
SELECT orderId,
ifThenElse(json:getString(item, 'name') == "cake",
json:toString(
json:setElement(item, 'price',
json:getDouble(item, 'price') - 5
)
),
item) AS item,
store
FROM TokenizedItemStream;


@info(name = 'Gather-query')
INSERT INTO GroupedItemStream
-- Combine `item` FROM all events in a batch AS a single JSON array.
SELECT orderId, json:group(item) AS items, store
-- Collect events traveling AS a batch via `batch()` window.
FROM DiscountedItemStream window batch();


@info(name = 'Format-query')
INSERT INTO DiscountedOrderStream
-- Format the final JSON by combining `orderId`, `items`, and `store`.
SELECT str:fillTemplate("""
{"discountedOrder":
{"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
}""", orderId, items, store) AS discountedOrder
FROM GroupedItemStream;

Scatter and Gather (JSON) Input

Below event is sent to PurchaseStream:

[{
"order":{
"id":"501",
"items":[{"name":"cake", "price":25.0},
{"name":"cookie", "price":15.0},
{"name":"bun", "price":20.0}
]
}
}, 'CA']

Scatter and Gather (JSON) Output

After processing, following events arrive at TokenizedItemStream:

['501', '{"name":"cake","price":25.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']

The events arrive at DiscountedItemStream:

['501', '{"name":"cake","price":20.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']

The event arriving at GroupedItemStream is:

['501', '[{"price":20.0,"name":"cake"},{"price":15.0,"name":"cookie"},{"price":20.0,"name":"bun"}]', 'CA']

The event arriving at DiscountedOrderStream is:

    [
{"discountedOrder":
{
"id":"501",
"store":"CA",
"items":[{"price":20.0,"name":"cake"},
{"price":15.0,"name":"cookie"},
{"price":20.0,"name":"bun"}]
}
}
]