Skip to main content


A stream worker query defines the processing logic in stream workers. A query consumes events from one or more:

The query then processes the events in a streaming manner and generates output events into one or more:


A query provides a way to process the events in the order they arrive and produce output using both stateful and stateless complex event processing and stream processing operations.


The high-level query syntax for defining processing logics is as follows:

@name('<query name>')
FROM <input>
<grouping clauses>

Query Parameters

The following parameters are used to configure a stream definition.

query nameThe name of the query. Since naming the query (i.e the @name('<query name>') annotation) is optional, when the name is not provided, Macrometa assigns a system-generated name for the query.
OUTPUT ACTIONDefines output action (such as INSERT INTO, UPDATE, DELETE, and so on) that needs to be performed by the generated events on a stream, named window, or table.
projectionGenerates output event attributes using SELECT, functions, and aggregation functions, and filters the generated the output operations before sending them out. The projection is optional, and when it is left out, all the input events are sent to the output as-is. For more information, refer to Query Projections.
FROM <input>Defines the means of event consumption via streams, named windows, tables, and/or named-aggregations, and defines the processing logic.
grouping clausesGROUP BY functions to group and organize output. For more information, refer to GROUP BY.

Query Output

Order of queries based on their output can drastically affect how your stream worker behaves.

In general, have queries that insert data into windows or aggregations before queries that insert data into tables.

Stream workers support the following output actions:


Query Projections

Query projection allow you to filter and transform streaming data by choosing which fields or attributes of a data stream you want to include or exclude in query results. By selecting only the necessary fields and performing any necessary calculations or aggregations, query projection can help to reduce the amount of data that needs to be processed and improve the overall performance of streaming applications.

In Macrometa stream worker queries, projection is specified using the SELECT clause. This clause is used to select a subset of fields from the incoming data stream and return them in the query results. The SELECT clause can also be used to perform transformations on the selected fields, such as aggregations or calculations.

Here are some examples things you can do with query projections in Macrometa stream workers.

Select All Fields or Specific Fields

Select some or all fields from the input stream to be inserted into an output stream.

For example, to select only the name and age fields from a stream of customer data, you could use the following query:

INSERT INTO OutputStream
SELECT name, age
FROM customers;

You can select all attributes in an input stream by using an asterisk (*) wildcard or by omitting the SELECT statement.

INSERT INTO OutputStream
FROM InputStream;

Or use:

INSERT INTO OutputStream
FROM InputStream;

Rename Attributes

Select attributes from an input stream and insert them into the output stream with different names.

For example, this query renames roomNo to roomNumber and temp to temperature.

INSERT INTO OutputStream
SELECT roomNo AS roomNumber, temp AS temperature
FROM InputStream;

Introduce Constant Values

You can add constant values by assigning them to an attribute using as.

For example, this query specifies C to be used as the constant value for scale attribute.

INSERT INTO OutputStream
SELECT roomNo, temp, 'C' AS scale
FROM InputStream;

Filter by Field Value

You can use the WHERE or HAVING clauses to filter the incoming data stream based on a specific field value.

For example, to select only customer data where the age field is greater than 30, you could use the following query:

INSERT INTO OutputStream
FROM InputStream
WHERE age > 30;

For more information, refer to HAVING | WHERE.

Use Expressions

You can use attributes with mathematical and logical expressions in the precedence order given below, and assigns them to the output attribute using AS.

Expression Example

Convert Celsius to Fahrenheit and identify rooms with room numbers between 10 and 15 as server rooms.

INSERT INTO OutputStream
SELECT roomNo, temp * 9/5 + 32 AS temp, 'F' AS scale, roomNo > 10 AND roomNo < 15 AS isServerRoom
FROM InputStream;

Operator Precedence Order

()Scope(cost + tax) * 0.05
IS NULLNull checkdeviceID is null
NOTLogical NOTnot (price > 10)
* / %Multiplication, division, modulotemp * 9/5 + 32
+ -Addition, subtractiontemp * 9/5 - 32
< <= > >=Comparators: less-than, greater-than-equal, greater-than, less-than-equaltotalCost >= price * quantity
\== !=Comparisons: equal, not equaltotalCost != price * quantity
INContains in tableroomNo in ServerRoomsTable
ANDLogical ANDtemp < 40 and (humidity < 40 or humidity >= 60)
ORLogical ORtemp < 40 or (humidity < 40 and humidity >= 60)

Aggregate Data

You can use aggregation functions like sum, avg, min, and max to perform calculations on the incoming data stream.

For example, to calculate the average age of customers in the stream, you could use the following query:

INSERT INTO OutputStream
FROM customers;

For more information about aggregating data, refer to Named Aggregations and Windows.

Perform Joins

Joining multiple streams: You can use the "JOIN" clause to combine data from multiple streams into a single query result. For example, to join a stream of customer data with a stream of sales data, you could use the following query:

SELECT * FROM customers JOIN sales ON = sales.customer_id;

Example 1

A query consumes events from the TempStream stream and output only the roomNo and temp attributes to the RoomTempStream stream, from which another query consumes the events and sends all its attributes to AnotherRoomTempStream stream.

CREATE STREAM TempStream (deviceID long, roomNo int, temp double);

INSERT INTO RoomTempStream
SELECT roomNo, temp
FROM TempStream;

INSERT INTO AnotherRoomTempStream
FROM RoomTempStream;

In this example, the RoomTempStream and AnotherRoomTempStream streams are an inferred streams, which means their stream definitions are inferred from the queries and they can be used same as any other defined stream without any restrictions.

Example 2

CREATE STREAM TempStream (deviceID long, roomNo int, temp double);

CREATE SINK OutputStream WITH (type='stream', stream='OutputStream', map.type='json') (roomNo int, avgTemp double);

INSERT INTO OutputStream
SELECT roomNo, avg(temp) AS avgTemp
FROM TempStream
GROUP BY roomNo;

This query takes the roomNo and temp values from TempStream, averages the temperatures, groups them by room number, outputs them into OutputStream.

Example 3

This example provides an example of streams and queries, and how multiple queries can be chained to one another.

Stream Worker Code

-- Defines `InputTemperatureStream` stream to pass events having `sensorId` and `temperature` attributes of types `string` and `double`.
CREATE STREAM InputTemperatureStream (sensorId string, temperature double);

-- Optional `@info` annotation to name the query.
@info(name = 'Pass-through')

-- Query to consume events from `InputTemperatureStream`, produce new events by selecting all the attributes from the incoming events, and outputs them to `TemperatureStream`.
INSERT INTO TemperatureAndSensorStream
FROM InputTemperatureStream;

@info(name = 'Simple-selection')

-- Selects only the `temperature` attribute from events, and outputs to `TemperatureOnlyStream`.
-- Consumes events from `TemperatureAndSensorStream`. The schema of the stream is inferred from the previous query, hence no need to be defined.
INSERT INTO TemperatureOnlyStream
SELECT temperature
FROM TemperatureAndSensorStream;

Events at Each Stream

When an event with values ['aq-14', 35.4] is sent to InputTemperatureStream stream, it is converted and travels through the streams as below.

  • InputTemperatureStream : ['aq-14', 35.4]
  • TemperatureAndSensorStream : ['aq-14', 35.4]
  • TemperatureOnlyStream : [35.4]