Skip to main content

Query Worker Sink

You can use a new or existing query worker as a stream worker sink.

For more information about writing queries and creating query workers, refer to Query Workers.

Syntax

CREATE SINK <NAME> WITH (type="query-worker", map.type="<STRING>", query.worker.name="<STRING>", sink.id="<STRING>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
query.worker.nameThe name of an existing query worker.stringNoYes
sink.idIdentifier to correlate the query-worker source with its corresponding query-worker sink that published the messages.stringNoNo

Example 1

CREATE SINK queryWorkerStream WITH (type='query-worker', query.worker.name="queryWorkerSample") (startTime string);

CREATE TRIGGER InitTrigger WITH (interval=1 minute);

INSERT INTO queryWorkerStream
SELECT time:dateFormat(eventTimestamp(), 'yyyy/MM/dd HH:mm:ss') as startTime
FROM InitTrigger;

Each minute, the sink query-worker calls the query worker named queryWorkerSample, which inserts data into the collection numbers as specified in the query worker.

Example 2

CREATE TRIGGER Trigger1 WITH (interval = 10 seconds);

-- always passthrough
CREATE SINK queryWorkerStream WITH (type='query-worker', query.worker.name="testql", `sink.id`="test") (value long);

-- json or passthrough
CREATE SOURCE queryWorkerStreamResponse WITH (type='query-worker', `sink.id`="test", map.type="json") (_id string, value long);

CREATE SINK STREAM TestStream(id string, message long);

INSERT INTO queryWorkerStream
SELECT eventTimestamp() as value
FROM Trigger1;

INSERT INTO TestStream
SELECT _id as id, value as message
FROM queryWorkerStreamResponse;

This example is identical to Sample-Query-Worker-Response in the Stream Workers Samples tab.