Skip to main content

Sink Mapping

Each sink.type configuration can have a mapping denoted by the map.type annotation that defines how to convert stream events to outgoing messages with the defined format.

The type parameter of the map.type defines the map type to be used in converting the outgoing events. The other parameters of map.type annotation depends on the mapper selected, and some of its parameters can be optional and/or dynamic.

For detailed information about the parameters see the documentation of the relevant mapper.

Map Payload

map.payload is an optional annotation used with map.type to define custom mapping. When the map.payload annotation is not provided, each mapper maps the outgoing events to its own default event format. The map.payload annotation allow users to configure mappers to produce the output payload of their choice, and by using dynamic properties within the payload they can selectively extract and add data from the published Stream events.

There are two ways you to configure map.payload annotation.

  1. Some mappers such as JSON and Test only accept one output payload:

    map.payload='This is a test message from {{user}}.'
  2. Some mappers such key-value accept series of mapping values:

    map.payload= key1='mapping_1', 'key2'='user : {{user}}'

    Here, the keys of payload mapping can be defined using the dot notation as a.b.c, or using any constant string value as '$abc'.

Supported Sink Mapping Types

The following is a list of sink mapping types supported by stream workers:

Sink Mapping TypeDescription
CSVConverts CSV-like delimiter separated events to stream events.
JSONConverts JSON messages to stream events.
Key-ValueConverts key-value hash maps to stream events.
PassThroughOmits data conversion on stream events.
TextConverts plain text messages to stream events.
tip

When the map.type annotation is not provided, map.type='passThrough' is used as default, that passes the outgoing events directly to the sinks without any data conversion.

Example 1

Publishes OutputStream events by converting them to JSON messages with the default format, and by sending to an HTTP endpoint http://localhost:8005/endpoint1, using POST method, Accept header, and basic authentication having admin is both username and password.

The configuration of the HTTP sink and JSON sink mapper to achieve the above is as follows.

CREATE SINK OutputStream WITH (sink.type='http', publisher.url='http://localhost:8005/endpoint', method='POST', headers='Accept-Date:20/02/2017', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json') (name string, age int, country string);

This publishes a JSON message on the following format:

{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}

Example 2

Publishes StockStream events by converting them to user defined JSON messages, and by sending to an HTTP endpoint http://localhost:8005/stocks.

The configuration of the HTTP sink and custom JSON sink mapping to achieve the above is as follows.

CREATE SINK StockStream WITH (sink.type='http', publisher.url='http://localhost:8005/stocks', map.type='json', validate.json='true', enclosing.element='$.Portfolio', map.payload="""{"StockData":{ "Symbol":"{{symbol}}", "Price":{{price}} }}""") (symbol string, price float, volume long);

This publishes a single event as the JSON message on the following format:

{
"Portfolio":{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
}
}

This can also publish multiple events together as a JSON message on the following format:

{
"Portfolio":[
{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
},
{
"StockData":{
"Symbol":"FB",
"Price":57.0
}
}
]
}

Example 3

Publishes events from the OutputStream stream to multiple HTTP endpoints using a partitioning strategy. Here the events are sent to either http://localhost:8005/endpoint1 or http://localhost:8006/endpoint2 based on the partitioning key country. It uses default JSON mapping, POST method, and used admin as both the username and the password when publishing to both the endpoints.

The configuration of the distributed HTTP sink and JSON sink mapper to achieve the above is:

CREATE SINK OutputStream WITH (sink.type='http', method='POST', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json', distribution.strategy='partitioned', partitionKey='country', destination.publisher.url='http://localhost:8005/endpoint1', destination.publisher.url='http://localhost:8006/endpoint2') (name string, age int, country string);

This partitions the outgoing events and publish all events with the same country attribute value to the same endpoint. The JSON message published will be in the following format:

{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}