Skip to main content

kafka

A Kafka source receives events to be processed by GDP stream workers from a topic with a partition for a Kafka cluster. The events received can be in the TEXT, JSON, or Binary format. If the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic.

Syntax

CREATE SOURCE <NAME> WITH (type="kafka", map.type="<STRING>", bootstrap.servers="<STRING>", topic.list="<STRING>", group.id="<STRING>", threading.option="<STRING>", partition.no.list="<STRING>", seq.enabled="<BOOL>", is.binary.message="<BOOL>", topic.offsets.map="<STRING>", enable.offsets.commit="<BOOL>", optional.configuration="<STRING>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
bootstrap.serversThis specifies the list of Kafka servers to which the Kafka source must listen. This list can be provided as a set of comma-separated values. e.g., localhost:9092,localhost:9093STRINGNoNo
topic.listThis specifies the list of topics to which the source must listen. This list can be provided as a set of comma-separated values. e.g., topic_one,topic_twoSTRINGNoNo
group.idThis is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event.STRINGNoNo
threading.optionThis specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows: single.thread: To run the Kafka source on a single thread. topic.wise: To use a separate thread per topic. partition.wise: To use a separate thread per partition.STRINGNoNo
partition.no.listThe partition number list for the given topic. This is provided as a list of comma-separated values. e.g., 0,1,2,.0STRINGYesNo
seq.enabledIf this parameter is set to true, the sequence of the events received via the source is taken into account. Therefore, each event should contain a sequence number as an attribute value to indicate the sequence.falseBOOLYesNo
is.binary.messageIn order to receive binary events via the Kafka source,it is required to setthis parameter to True.falseBOOLYesNo
topic.offsets.mapThis parameter specifies reading offsets for each topic and partition. The value for this parameter is specified in the following format: <topic>=<offset>,<topic>=<offset>, When an offset is defined for a topic, the Kafka source skips reading the message with the number specified as the offset as well as all the messages sent previous to that message. If the offset is not defined for a specific topic it reads messages from the beginning. e.g., stocks=100,trades=50 reads from the 101th message of the stocks topic, and from the 51st message of the trades topic.nullSTRINGYesNo
enable.offsets.commitThis parameter specifies whether to commit offsets. If the manual asynchronous offset committing is needed, enable.offsets.commit should be true and enable.auto.commit should be false. If periodical committing is needed enable.offsets.commit should be true and enable.auto.commit should be true. If committing is not needed, enable.offsets.commit should be false. Note: enable.auto.commit is an optional.configuration property. If it is set to true, Source will periodically(default: 1000ms. Configurable with auto.commit.interval.ms property as an optional.configuration) commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. To guarantee at-least-once processing, we recommend you to enable Stream App Periodic State Persistence when enable.auto.commit property is set to true. During manual committing, it might introduce a latency during consumption.trueBOOLYesNo
optional.configurationThis parameter contains all the other possible configurations that the consumer is created with. e.g., ssl.keystore.type:JKS,batch.size:200.nullSTRINGYesNo

Example 1

@App:name('TestExecutionPlan')
CREATE STREAM BarStream (symbol string, price float, volume long);

CREATE SOURCE FooStream WITH (type='kafka', topic.list='kafka_topic,kafka_topic2', group.id='test', threading.option='partition.wise', bootstrap.servers='localhost:9092', partition.no.list='0,1', map.type='json') (symbol string, price float, volume long);

@info(name = 'query1')
insert into BarStream
from FooStream select symbol, price, volume;

This kafka source configuration listens to the kafka_topic and kafka_topic2 topics with 0 and 1 partitions. A thread is created for each topic and partition combination. The events are received in the JSON format, mapped to a stream worker event, and sent to a stream named FooStream.

Example 2

@App:name('TestExecutionPlan')
CREATE STREAM BarStream (symbol string, price float, volume long);

CREATE SOURCE FooStream WITH (type='kafka', topic.list='kafka_topic', group.id='test', threading.option='single.thread', bootstrap.servers='localhost:9092', map.type='json') (symbol string, price float, volume long);

@info(name = 'query1')
insert into BarStream
from FooStream select symbol, price, volume;

This Kafka source configuration listens to the kafka_topic topic for the default partition because no partition.no.list is defined. Only one thread is created for the topic. The events are received in the JSON format, mapped to a stream worker event, and sent to a stream named FooStream.