Skip to main content

Stream Sink

Stream sinks consume events from streams and publish them using multiple transports to external endpoints in various data formats.

Purpose

Stream sinks provides a way to publish the events of a stream to external systems by converting events to their supported format.

Syntax

The stream sink syntax is as follows:

   CREATE SINK SinkName WITH (
type="stream",
stream="STRING",
replication.type="STRING",
num.io.threads="INT",
key.shared.attribute="STRING, STRING, ...",
stream.url="STRING",
stream.admin.url="STRING",
auth.plugin="STRING",
auth.params="STRING",
map.type='type')
(<attribute_name> <attribute_type>,
<attribute_name> <attribute_type>, ... );;

Or you can use the syntax shortcut CREATE SINK STREAM:

CREATE SINK STREAM <GLOBAL | LOCAL> SampleStreamSink (data string);

Or you can use the syntax shortcut for local stream CREATE SINK STREAM:

CREATE SINK STREAM  SampleStreamSink (data string);

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptional
streamThe streams to which the sink needs to publish events.STRINGNo
replication.typeSpecifies if the replication type of the streams. Possible values can be LOCAL and GLOBAL.LOCALSTRINGYes
num.io.threadsThe number of I/O threads.1INTYes
key.shared.attributesThe attributes to be included into the message key.-STRINGYes
stream.urlThe URL of the Pulsar broker, e.g. pulsar_ssl://my-broker:6651.NULLSTRINGYes
stream.admin.urlThe admin URL of the Pulsar broker, e.g. https://my-broker:443.NULLSTRINGYes
auth.pluginThe required autentication plugin, e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken.NULLSTRINGYes
auth.paramsThe required autentication parameters, e.g. JWT in case auth.plugin="org.apache.pulsar.client.impl.auth.AuthenticationToken".NULLSTRINGYes

Example 1

CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);

Example 2

CREATE SINK STREAM UserIdPurchaseStream(userId string, totalItems long, totalPrice double);