Skip to main content

Stream Workers SDK Example

This article is an introduction to using stream workers (sometimes called stream apps) with Macrometa SDKs.

Prerequisites

Get Started with Stream Workers

This page guides you through creating a stream worker, updating it, and running an ad hoc query on it using the pyC8 and jsC8 SDKs.

  1. Create a new JavaScript (.js) or Python (.py) file in your favorite IDE.
  2. Copy the code block below and paste it into your JavaScript or Python file.
  3. With each subsequent step, append the code block to the existing file and then run it.

If you want to skip the explanation and just run the code, then go directly to the Full Demo File.

Step 1. Connect to GDN

To use stream workers with Macrometa Global Data Network (GDN), you must first establish a connection to a local region.

When this code runs, it initializes the server connection to the specified region URL. For more information about connecting to GDN, refer to Authentication.

# Import libraries
from c8 import C8Client

# Define constants
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key

print("--- Connecting to GDN")

# Choose one of the following methods to access the GDN. API key is recommended.

# Authenticate with API key
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)

# Authenticate with JWT
# client = C8Client(protocol='https', host=URL, port=443, token=<your token>, geofabric=GEO_FABRIC))

# Authenticate with email and password
# client = C8Client(protocol='https', host=URL, port=443, email=<your email id>, password=<your password>, geofabric=GEO_FABRIC)

Step 2. Validate Stream Worker

Validate the stream worker for syntax errors before saving. If valid, then the system returns True.

The stream worker shown below reads data from a collection and publishes it to a stream.

# Import libraries you'll need later
import time

# Define the stream app to validate.
stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream worker to demonstrate reading data from input stream and store it in the collection. The stream and collections are automatically created if they do not already exist.')
/**
Test the stream worker:
1. Open Stream SampleCargoAppDestStream in console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable collection:
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages are shown on the SampleCargoAppDestStream Stream Console:
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
*/

-- Create Table SampleCargoAppInputTable to process events.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection ="SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream ="SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""

# Validate the stream worker code.
print("--- Validating stream worker definition")
print(client.validate_stream_app(data=stream_app_definition))

Step 3. Create Stream Worker

By default, the stream worker is created in the local region. You can use dclist (domain component list) to deploy the stream worker in other specified regions, or in all regions.

# Create the stream worker.
dclist = []
print("--- Creating stream worker")
print(client.create_stream_app(data=stream_app_definition, dclist=dclist))

Step 4. Activate and Deactivate Stream Worker

Sometimes you need to turn a stream worker on or off. The commands below demonstrate how to do that programmatically. Make sure that the stream worker is activated (published) before continuing to the next step!

# Activate the stream worker.
# Check if already active
result = client.get_stream_app('Sample-Cargo-App')
if result[0]['isActive'] is False:
print("Activate", client.activate_stream_app('Sample-Cargo-App', True))
else:
print("Stream worker already active")

# You can also deactivate the stream worker.
# print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))

Step 5. Update Stream Worker

The code below adds a second data processing step. It updates the stream worker to store the input data into itself and another collection called SampleCargoAppDestTable.

After you run this command, you can view the changes in the Macrometa console Stream Workers page.

# Code with which the stream worker will be updated.
data = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream worker to demonstrate reading data from input stream and store it in a collection. The stream and collections are automatically created if they do not already exist.')
/**
Test the stream worker:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable collection:
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages are shown on the `SampleCargoAppDestStream` Stream Console:
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
4. Following messages are stored into SampleCargoAppDestTable
{"weight":1}
{"weight":2}
{"weight":3}
{"weight":4}
{"weight":5}
*/

-- Defines Table SampleCargoAppInputTable
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Define Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

-- Defining a destination table to dump the data from the stream
CREATE TABLE SampleCargoAppDestTable (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;

-- Data Processing
@info(name='Dump')
INSERT INTO SampleCargoAppDestTable
SELECT weight
FROM SampleCargoAppInputTable;
"""

# Create an instance of a stream worker and deactivate it before you update it.
client.activate_stream_app('Sample-Cargo-App', False)
app = client._fabric.stream_app("Sample-Cargo-App")

# Update the stream worker.
print("--- Updating stream worker `Sample-Cargo-App`")
app.update(data)
# Wait time is needed after updating a stream worker to initialize resources
time.sleep(10)
app.change_state(True)

Step 6. Insert data and run an Ad Hoc Query

In this example, we use a query worker insertWeight to insert data into SampleCargoAppInputTable and then we run an ad hoc query on the store SampleCargoAppDestTable used in the stream worker. It gets the records that you inserted into SampleCargoAppInputTable.


# Inserting data into SampleCargoAppInputTable using a query worker.
insert_data_value = 'INSERT { "weight": @weight } IN SampleCargoAppInputTable'
insert_data_query = {
"query": {
"name": "insertWeight",
"value": insert_data_value,
}
}

client.create_restql(insert_data_query)
time.sleep(2)
for i in range(5):
client.execute_restql("insertWeight", {"bindVars": {"weight": i}})
# Deleting the query worker.
client.delete_restql("insertWeight")

# Run ad hoc query against the store.
print("--- Running an ad hoc query against the store `SampleCargoAppDestTable`")
q = "SELECT * FROM SampleCargoAppDestTable limit 3"
result = app.query(q)
print(result)

Step 7. Delete Stream Worker

You're done with this stream worker, so time to delete it.

# Delete the stream worker.

print("--- Deleting stream worker `Sample-Cargo-App`")
result = client.delete_stream_app('Sample-Cargo-App')

Full Demo File

The following example uses the code snippets provided in this tutorial.

# Import libraries
from c8 import C8Client
import time

# Define constants
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key

print("--- Connecting to GDN")
# Choose one of the following methods to access the GDN. API key is recommended.

# Authenticate with API key
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)

# Authenticate with JWT
# client = C8Client(protocol='https', host=URL, port=443, token=<your token>, geofabric=GEO_FABRIC)

# Authenticate with email and password
# client = C8Client(protocol='https', host=URL, port=443, email=<your email id>, password=<your password>, geofabric=GEO_FABRIC)

# Define the stream app to validate.
stream_app_definition = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream worker to demonstrate reading data from input stream and store it in the collection. The stream and collections are automatically created if they do not already exist.')
/**
Test the stream worker:
1. Open Stream SampleCargoAppDestStream in console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable collection:
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages are shown on the SampleCargoAppDestStream Stream Console:
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
*/

-- Create Table SampleCargoAppInputTable to process events.
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection ="SampleCargoAppInputTable", collection.type="doc", replication.type="global", maptype='json') (weight int);

-- Create Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream ="SampleCargoAppDestStream", replication.type="local") (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;
"""

# Validate the stream worker code.
print("--- Validating stream worker definition")
print(client.validate_stream_app(data=stream_app_definition))

# Create the stream worker.
dclist = []
print("--- Creating stream worker")
print(client.create_stream_app(data=stream_app_definition, dclist=dclist))

# Activate the stream worker.
# Check if already active
result = client.get_stream_app('Sample-Cargo-App')
if result[0]['isActive'] is False:
print("Activate", client.activate_stream_app('Sample-Cargo-App', True))
else:
print("Stream worker already active")

# You can also deactivate the stream worker.
# print("Deactivate", client.activate_stream_app('Sample-Cargo-App', False))

# Code with which the stream worker will be updated.
data = """
@App:name('Sample-Cargo-App')
@App:qlVersion("2")
@App:description('Basic stream worker to demonstrate reading data from input stream and store it in a collection. The stream and collections are automatically created if they do not already exist.')
/**
Test the stream worker:
1. Open Stream SampleCargoAppDestStream in Console. The output can be monitored here.
2. Upload following data into SampleCargoAppInputTable collection:
{"weight": 1}
{"weight": 2}
{"weight": 3}
{"weight": 4}
{"weight": 5}
3. Following messages are shown on the `SampleCargoAppDestStream` Stream Console:
[2021-08-27T14:12:15.795Z] {"weight":1}
[2021-08-27T14:12:15.799Z] {"weight":2}
[2021-08-27T14:12:15.805Z] {"weight":3}
[2021-08-27T14:12:15.809Z] {"weight":4}
[2021-08-27T14:12:15.814Z] {"weight":5}
4. Following messages are stored into SampleCargoAppDestTable
{"weight":1}
{"weight":2}
{"weight":3}
{"weight":4}
{"weight":5}
*/

-- Defines Table SampleCargoAppInputTable
CREATE SOURCE SampleCargoAppInputTable WITH (type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc", replication.type="global", map.type='json') (weight int);

-- Define Stream SampleCargoAppDestStream
CREATE SINK SampleCargoAppDestStream WITH (type = 'stream', stream = "SampleCargoAppDestStream", replication.type="local") (weight int);

-- Defining a destination table to dump the data from the stream
CREATE TABLE SampleCargoAppDestTable (weight int);

-- Data Processing
@info(name='Query')
INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;

-- Data Processing
@info(name='Dump')
INSERT INTO SampleCargoAppDestTable
SELECT weight
FROM SampleCargoAppInputTable;
"""

# Create an instance of a stream worker and deactivate it before you update it.
client.activate_stream_app('Sample-Cargo-App', False)
app = client._fabric.stream_app("Sample-Cargo-App")

# Update the stream worker.
print("--- Updating stream worker `Sample-Cargo-App`")
app.update(data)
# Wait time is needed after updating a stream worker to initialize resources
time.sleep(10)
app.change_state(True)

# Inserting data into SampleCargoAppInputTable using a query worker.
insert_data_value = 'INSERT { "weight": @weight } IN SampleCargoAppInputTable'
insert_data_query = {
"query": {
"name": "insertWeight",
"value": insert_data_value,
}
}

client.create_restql(insert_data_query)
time.sleep(2)
for i in range(5):
client.execute_restql("insertWeight", {"bindVars": {"weight": i}})
# Deleting the query worker.
client.delete_restql("insertWeight")

# Run ad hoc query against the store.
print("--- Running an ad hoc query against the store `SampleCargoAppDestTable`")
q = "SELECT * FROM SampleCargoAppDestTable limit 3"
result = app.query(q)
print(result)

# Delete the stream worker.
print("--- Deleting stream worker `Sample-Cargo-App`")
result = client.delete_stream_app('Sample-Cargo-App')