Skip to main content

Shared Subscription Example

This page describes how to configure a shared subscription for one or more streams. Refer to the shared subscription section for details and limitations.

To test the example code, open three terminals simultaneously and run node producer.js, then run consumer-1.js in second terminal and consumer-2.js in third terminal. If successful, you will see messages in both consumer terminals.

Prerequisites

Create Producer

This code creates a stream if one doesn't already exist, then creates a producer.

const jsc8 = require("jsc8");

const BASE_URL = "https://play.paas.macrometa.io/"

client = new jsc8({
url: BASE_URL,
apiKey: "xxxxxx",
fabricName: "_system",
});

const streamName = "streamQuickstart";

async function createStream() {
if (await client.hasStream(streamName, false)) {
console.log("This stream already exists!");
console.log(`Existing Producer = c8globals.${streamName}`);
} else {
console.log("\nCreating global stream...");
// To create a global stream, set the second parameter to false
// There is an option to create a local stream, which is only accessible within the region
const streamInfo = await client.createStream(streamName, false);
console.log(`New Producer = ${streamInfo.result["stream-id"]}`);
}
}

async function producer() {
try {
// Create stream only if stream does not exist
createStream();
await console.log("\nConnecting producer to global stream...");

// Request stream object
const stream = client.stream(streamName, false);
// Request One Time Password
const producerOTP = await stream.getOtp();
// Create producer
const producer = await stream.producer(BASE_URL.replace("https://",""), {
otp: producerOTP
});

// Run producer - Open connection to server
producer.on("open", () => {});

// Set messages in interval of 1000 ms
setInterval(() => {
// If your message is an object, convert the object to a string.
// e.g. const message = JSON.stringify({message:'Hello World'});
const message = `Hello Macrometa Stream! Here is your random message number ${Math.floor(
Math.random() * 101
)}`;
let payloadObj = { payload: Buffer.from(message).toString("base64") };
producer.send(JSON.stringify(payloadObj));
}, 1000);

producer.onclose = function () {
console.log("Closed WebSocket:Producer connection for " + streamName);
};
} catch (e) {
await console.log("Error while creating stream publisher" + e);
}
}

producer();

Create Consumer 1

This code creates a stream if one doesn't already exist, then creates the first consumer.

const jsc8 = require("jsc8");

const BASE_URL = "https://play.paas.macrometa.io/"

client = new jsc8({
url: BASE_URL,
apiKey: "xxxxxx",
fabricName: "_system",
});

const streamName = "streamQuickstart";
const subscriptionName = "consumer-subscription"

async function createStream() {
if (await client.hasStream(streamName, false)) {
console.log("This stream already exists!");
console.log(`Existing Consumer = c8globals.${streamName}`);
} else {
console.log("\nCreating global stream...");
// To create a global stream, set the second parameter to false
// There is an option to create a local stream, which is only accessible within the region
const streamInfo = await client.createStream(streamName, false);
console.log(`New Consumer = ${streamInfo.result["stream-id"]}`);
}
}

async function consumer() {
try {
await console.log("\nConnecting consumer to global stream...");
// Create stream only if stream does not exist
createStream();
// Request stream object
const stream = client.stream(streamName, false);
// Request One Time Password
const consumerOTP = await stream.getOtp();
// Create consumer
const consumer = await stream.consumer(subscriptionName, BASE_URL.replace("https://",""), {
otp: consumerOTP,
subscriptionType: "Shared"
});
// Run consumer - open connection to server
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
// Received message payload
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
} catch (e) {
await console.log("Could not receive messages " + e);
}
}

consumer();

Create Consumer 2

This code creates a stream if one doesn't already exist, then creates the second consumer.

The name of consumer 2 must match the name of consumer 1.

const jsc8 = require("jsc8");

const BASE_URL = "https://play.paas.macrometa.io/"

client = new jsc8({
url: BASE_URL,
apiKey: "xxxxxx",
fabricName: "_system",
});

const streamName = "streamQuickstart";
const subscriptionName = "consumer-subscription"

async function createStream() {
if (await client.hasStream(streamName, false)) {
console.log("This stream already exists!");
console.log(`Existing Consumer = c8globals.${streamName}`);
} else {
console.log("\nCreating global stream...");
// To create a global stream, set the second parameter to false
// There is an option to create a local stream, which is only accessible within the region
const streamInfo = await client.createStream(streamName, false);
console.log(`New Consumer = ${streamInfo.result["stream-id"]}`);
}
}

async function consumer() {
try {
await console.log("\nConnecting consumer to global stream...");
// Create stream only if stream does not exist
createStream();
// Request stream object
const stream = client.stream(streamName, false);
// Request One Time Password
const consumerOTP = await stream.getOtp();
// Create consumer
const consumer = await stream.consumer(subscriptionName, BASE_URL.replace("https://",""), {
otp: consumerOTP,
subscriptionType: "Shared"
});
// Run consumer - open connection to server
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
// Received message payload
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
} catch (e) {
await console.log("Could not receive messages " + e);
}
}

consumer();