Skip to main content

Source Mapping

Each source.type configuration can have a mapping denoted by the map.type annotation that defines how to convert the incoming event format to Stream events.

The type parameter of the map.type defines the map type to be used in converting the incoming events. The other parameters of map.type annotation depends on the mapper selected, and some of its parameters can be optional.

For detailed information about the parameters see the documentation of the relevant mapper.

Map Attributes

attributes is an optional annotation used with map.type to define custom mapping. When attributes is not provided, each mapper assumes that the incoming events adheres to its own default message format and attempt to convert the events from that format. By adding the attributes annotation, you can selectively extract data from the incoming message and assign them to the attributes.

There are two ways to configure attributes. In both cases, add the attributes in parentheses after the query:

  • Define attribute names as keys, and mapping configurations as values:

    ... ( <attribute1>='<mapping>', <attributeN>='<mapping>')
  • Define the mapping configurations in the same order as the attributes defined in stream definition:

    ... ( '<mapping for attribute1>', '<mapping for attributeN>')

Supported Source Mapping Types

The following is the list of some source mapping types supported by stream.

Source Mapping TypeDescription
CSVConverts CSV-like delimiter separated events to stream events.
JSONConverts JSON messages to stream events.
Key-ValueConverts key-value hash maps to stream events.
PassThroughOmits data conversion on stream events.
TextConverts plain text messages to stream events.
tip

When the map.type annotation is not provided map.type='passThrough' is used as default, that passes the consumed stream events directly to the stream worker without any data conversion.

Example 1

Receive JSON messages via stream, and direct them to InputStream stream for processing. The stream expects the JSON messages to be on the default data format that's supported by the JSON mapper as follows.

{
"name":"Paul",
"age":20,
"country":"UK"
}

The configuration of the stream source and JSON source mapper to achieve the above is:

CREATE SOURCE InputStream WITH (source.type='stream', streams.list='foo', map.type='json') (name string, age int, country string);

Example 2

Receive JSON messages via database, and direct them to StockStream stream for processing. Here the incoming JSON, as given below, do not adhere to the default data format that's supported by the JSON mapper.

{
"portfolio":{
"stock":{
"volume":100,
"company":{
"symbol":"FB"
},
"price":55.6
}
}
}

The configuration of the database source and the custom JSON source mapping to achieve the above is:

CREATE SOURCE StockStream WITH (source.type='database', collection='foo', map.type='json', enclosing.element="$.portfolio",) (symbol = "stock.company.symbol", price = "stock.price", volume = "stock.volume");

The same can also be configured by omitting the attribute names as below:

CREATE SOURCE StockStream WITH (source.type='database', collection='foo', map.type='json', enclosing.element="$.portfolio",) ("stock.company.symbol", "stock.price", "stock.volume");