Skip to main content

Store RDBMS Plugin

You can use plugins in Macrometa to extend the functionality of your streams.

The Store plugin provides persistence and retrieval of events to and from RDBMS databases like MySQL, MS SQL, PostgreSQL, H2, and Oracle. If you are using CEP as the Java/Python library, then you must also set the data source in the CEP Manager.

This plugin provides the following functionality:

Create, Update, Delete (CUD)

You can use SQL CUD to perform INSERT, UPDATE, and DELETE queries on data sources.

Parameters

Insert the following parameters into the provided template to create a CUD function.

NameData TypeDefault ValueOptional?Dynamic?Description
data-source.nameSTRINGN/ANoNoName of the data source on which to run the query.
querySTRINGN/ANoYesThe INSERT, UPDATE, or DELETE query to be performed. Format according to the relevant database type.
parameterAnyN/AYesYesIf query is a parameter used as an SQL query, then you can use this field to pass CEP attributes to set parameter values.

Additionally, the numRecords attribute (INT) indicates the number of records manipulated by the query.

CUD Template

Use the following template to create a CUD function:

rdbms:cud(STRING jdbc.url, STRING username, STRING password, STRING jdbc.driver.name, STRING query)
rdbms:cud(STRING jdbc.url, STRING username, STRING password, STRING jdbc.driver.name, STRING query, STRING|BOOL|INT|DOUBLE|FLOAT|LONG parameter, STRING|BOOL|INT|DOUBLE|FLOAT|LONG ...)

CUD Examples

The following examples assume you have an input stream called TriggerStream and an output stream called RecordStream.

This example query updates events from the input stream by adding a numRecords attribute, then inserts the updated events into an output stream:

INSERT INTO RecordStream
SELECT numRecords
FROM TriggerStream#rdbms:cud("jdbc:mysql://hostname:3306/MySQLDB?useSSL=false", "username", "password", "com.mysql.jdbc.Driver", "UPDATE Customers_Table SET customerName='abc' where customerName='xyz'");

Make sure the stream is defined with:

CREATE STREAM TriggerStream (customerName string);

This example query does the same thing with the addition of previousName and changedName attributes to indicate the names of the input and output streams:

INSERT INTO RecordStream
SELECT numRecords
FROM TriggerStream#rdbms:cud("jdbc:mysql://hostname:3306/MySQLDB?useSSL=false", "username", "password", "com.mysql.jdbc.Driver", "UPDATE Customers_Table SET customerName=? where customerName=?", changedName, previousName);

Make sure the stream is defined with:

CREATE STREAM TriggerStream (customerName string, changedName string, previousName string);

Query

You can use the Query function to perform SQL retrieval queries on a data source.

Query Parameters

Insert the following parameters into the provided template to create a Query function.

NameData TypeDefault ValueOptional?Dynamic?Description
data-source.nameSTRINGN/ANoNoName of the data source on which to run the query.
querySTRINGN/ANoYesThe INSERT, UPDATE, or DELETE query to be performed. Format according to the relevant database type.
parameterAnyN/AYesYesIf query is a parameter used as a SQL query, you can use this field to pass CEP attributes to set parameter values.
attribute.definition.listSTRINGN/ANoYesA comma-separated list of attributes to return with the SQL query. Each item is processed in order. Supported data types are STRING, INT, LONG, DOUBLE, FLOAT, and BOOL.
ack.empty.result.setBOOLfalseYesNoWhen set to true, null values are returned if the result set is empty. When set to false, nothing is returned if the result set is empty.

Additionally, attributeName (any type) returns the attributes listed in the parameter attribute.definition.list.

Query Template

Use the following template to create a query function:

rdbms:query(STRING jdbc.url, STRING username, STRING password, STRING jdbc.driver.name, STRING attribute.definition.list, STRING query)
rdbms:query(STRING jdbc.url, STRING username, STRING password, STRING jdbc.driver.name, STRING attribute.definition.list, STRING query, STRING|BOOL|INT|DOUBLE|FLOAT|LONG parameter, STRING|BOOL|INT|DOUBLE|FLOAT|LONG ...)
rdbms:query(STRING data-source.name, STRING attribute.definition.list, STRING query, BOOL ack.empty.result.set)
rdbms:query(STRING jdbc.url, STRING username, STRING password, STRING jdbc.driver.name, STRING attribute.definition.list, STRING query, STRING|BOOL|INT|DOUBLE|FLOAT|LONG parameter, STRING|BOOL|INT|DOUBLE|FLOAT|LONG ..., BOOL ack.empty.result.set)

Query Examples

The following examples query creditcardno, country, transaction, and amount from a database called SAMPLE_DB, then generate an event for each record retrieval insert the events into the recordStream output stream:

INSERT INTO recordStream
SELECT creditcardno, country, transaction, amount
FROM TriggerStream#rdbms:query("jdbc:mysql://hostname:3306/MySQLDB?useSSL=false", "username", "password", "com.mysql.jdbc.Driver", 'creditcardno string, country string,transaction string, amount int', 'select * from Transactions_Table');

The event includes the attributes defined in the attribute.definition.list as additional attributes (e.g. creditcardno string, country string, transaction string, amount int).

Additionally, this example uses the countrySearchWord parameter as a filter.

INSERT INTO recordStream
SELECT creditcardno, country, transaction, amount
FROM TriggerStream#rdbms:query("jdbc:mysql://hostname:3306/MySQLDB?useSSL=false", "username", "password", "com.mysql.jdbc.Driver", 'creditcardno string, country string,transaction string, amount int', 'select * from where country=?', countrySearchWord);

Make sure the stream is defined with:

CREATE STREAM TriggerStream (countrySearchWord string);

This example returns null values if there are no events that satisfy the query:

INSERT INTO recordStream 
SELECT creditcardno, country, transaction, amount
FROM TriggerStream#rdbms:query("jdbc:mysql://hostname:3306/MySQLDB?useSSL=false", "username", "password", "com.mysql.jdbc.Driver",
'creditcardno string, country string,transaction string, amount int', 'select * from where country=?', countrySearchWord, true);

Make sure the stream is defined with:

CREATE STREAM TriggerStream (countrySearchWord string);

Store

The Store function can create and edit event tables, configure the table's data sources and connections, and insert, update, or delete data from the tables. It requires read-write permissions on connected data sources.

Store Parameters

Insert the following parameters into the provided template to create a Store function.

NameData TypeDefault ValueOptional?Dynamic?Description
jdbc.urlSTRINGN/ANoNoThe JDBC URL used to access the RDBMS data store.
usernameSTRINGN/ANoNoThe username used to access the RDBMS data store.
passwordSTRINGN/ANoNoThe password used to access the RDBMS data store.
jdbc.driver.nameSTRINGN/ANoNoThe name of the JDBC SDK used to access the RDBMS data store.
datasourceSTRINGN/ANoNoIf you want to use a Carbon data source, specify the name of the data source on which to run the query.
jndi.resourceSTRINGnullYesNoIf you want to use JNDI look-up, specify the name of the JNDI resource through which you want to attempt connection. If you use a data source, this is not used.
pool.propertiesSTRINGnullYesNoUse key-value pairs to specify any pool parameters for the database connection. If you use a datasource or jndi.resource, this is not used.
table.nameSTRINGTable name defined in CEP query.YesNoSpecify the name to be used for the event table.
field.lengthSTRINGDependent on database typeYesNoUse a comma-separated list of key-value pairs (field.name:) to specify the number of characters that fields in the table must contain.
table.check.querySTRINGtableCheckQuery value in RDBMS configuration.STRINGYesNo
use.collationBOOLfalseYesNoSet to true to enable collation for string attributes. We use latin1_bin for MySQL and SQL_Latin1_General_CP1_CS_AS for Microsoft SQL.
allow.null.valuesBOOLfalseYesNoSet to true to allow users to insert null values into numeric columns.

Store Template

Use the following template to create a Store function:

@Store(type="rdbms", jdbc.url="STRING", username="STRING", password="STRING", jdbc.SDK.name="STRING", pool.properties="STRING", jndi.resource="STRING", datasource="STRING", table.name="STRING", field.length="STRING", table.check.query="STRING", use.collation="BOOL", allow.null.values="BOOL")
@PrimaryKey("PRIMARY_KEY")
@Index("INDEX")

Store Examples

The following example creates an event table named StockTable if one does not already exist in the database. The connection details are specified by the attributes under the @Store annotation.

CREATE STORE StockTable WITH (type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/stocks", username="root", password="root", jdbc.driver.name="com.mysql.jdbc.Driver", field.length="symbol:100", PrimaryKey="id", PrimaryKey="symbol", Index="volume") (id string, symbol string, price float, volume long);

The @PrimaryKey() and @Index() annotations follow CEP query syntax to define the primary key and index for a table. Use commas to separate multiple attributes.

This example creates an event table named StockTable, then adds a stream called InputStream:

CREATE STORE StockTable WITH (type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/das", username="root", password="root" , jdbc.driver.name="org.h2.Driver", field.length="symbol:100", PrimaryKey="symbol", Index="symbol") (symbol string, price float, volume long);
CREATE STREAM InputStream (symbol string, volume long);

INSERT INTO FooStream
SELECT a.symbol as symbol, b.volume as volume
FROM InputStream as a join StockTable as b on str:contains(b.symbol, a.symbol);

This example checks to see if a table named StockTable exists in the database, and if not, creates an event table named StockTable.

CREATE STORE StockTable WITH (type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/das", table.name="StockTable", username="root", password="root" , jdbc.driver.name=" com.mysql.jdbc.Driver ", field.length="symbol:100", table.check.query="SELECT 1 FROM StockTable LIMIT 1", PrimaryKey="symbol", Index="symbol") (symbol string, price float, volume long);
CREATE STREAM InputStream (symbol string, volume long);

INSERT INTO FooStream
SELECT a.symbol as symbol, b.volume as volume
FROM InputStream as a join StockTable as b on str:contains(b.symbol, a.symbol);