Skip to main content

Pub-Sub with Streams in Browser Example

This example shows how to use the GDN console to publish messages on a stream and subscribe to receive messages.

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Pub-Sub demo</title>
<link
href="https://fonts.googleapis.com/css2?family=Overpass+Mono&display=swap"
rel="stylesheet"
/>
<link
rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/normalize/5.0.0/normalize.css"
/>
<link
rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/milligram/1.3.0/milligram.css"
/>
<style rel="stylesheet">
#console {
font-family: "Roboto Mono", monospace !important;
}
</style>
</head>

<body class="container" style="max-width: none;">
<div class="row">
<div class="column column-40" style="padding: 20px; height: 100vh;">
<h1>Pub Sub Demo</h1>

<textarea
rows="10"
style="resize: vertical; height: 200px;"
placeholder="Enter your message here..."
id="messageBox"
disabled
></textarea>
<div style="display: flex;">
<div style="flex-grow: 1;">
<button onclick="init()" class="button-outline" id="startButton">
Start
</button>

<button
onclick="closeConnection()"
class="button-clear"
disabled
id="closeButton"
>
Close
</button>
</div>

<button onclick="publish()" id="publishButton" disabled>
Publish
</button>
</div>
</div>
<div
id="console"
class="column column-60"
style="padding: 20px; height: 100vh; background: black; overflow: auto;"
></div>
</div>
</body>
<script type="text/javascript">
/* ------------------- API MIDDLEWARE TO MANAGE API CALLS ------------------- */

class APIRequest {
_headers = {
Accept: "application/json",
"Content-Type": "application/json",
};

constructor(url) {
this._url = url;
}

login(email, password) {
const endpoint = "/_open/auth";

const self = this;

return new Promise(function (resolve, reject) {
self
.req(endpoint, {
body: { email, password },
method: "POST",
})
.then(({ jwt, ...data }) => {
self._headers.authorization = `bearer ${jwt}`;
resolve(data);
})
.catch(reject);
});
}

_handleResponse(response, resolve, reject) {
if (response.ok) {
resolve(response.json());
} else {
reject(response);
}
}

req(endpoint, { body, ...options } = {}) {
const self = this;
return new Promise(function (resolve, reject) {
fetch(self._url + endpoint, {
headers: self._headers,
body: body ? JSON.stringify(body) : undefined,
...options,
}).then((response) =>
self._handleResponse(response, resolve, reject)
);
});
}
}

/* ---------------------------- PUB-SUB TUTORIAL ---------------------------- */

const EMAIL = "your@email.com";
const PASSWORD = "password";

const URL_NAME = "api-play.paas.macrometa.io";
const HTTP_URL = `https://${URL_NAME}`;

const STREAM_NAME = "api_tutorial_streams";
const CONSUMER_NAME = "api_tutorial_streams_consumer";
const IS_GLOBAL = true;

/* ------------------------------ UI References ----------------------------- */

const consoleElement = document.getElementById("console");
const input = document.getElementById("messageBox");
const startButton = document.getElementById("startButton");
const closeButton = document.getElementById("closeButton");
const publishButton = document.getElementById("publishButton");

/* ---------------------------- Global Variables ---------------------------- */

var consumer;
var producer;

/* ---------------------------- Helpers Functions --------------------------- */

function checkTime(i) {
if (i < 10) {
i = "0" + i;
}
return i;
}

function getTime() {
var today = new Date();
var h = today.getHours();
var m = today.getMinutes();
var s = today.getSeconds();
// add a zero in front of numbers<10
m = checkTime(m);
s = checkTime(s);
return h + ":" + m + ":" + s;
}

function print(msg) {
var node = document.createElement("small");

node.style =
"display:block; font-weight:400;color:white;word-break:break-all;position:relative;padding-left:100px";
var span = document.createElement("span");
span.style = "position:absolute;left:0";

var time = document.createTextNode(`> ${getTime()} : `);

span.appendChild(time);

var textnode = document.createTextNode(`${msg}`);
node.appendChild(span);
node.appendChild(textnode);
consoleElement.appendChild(node);
consoleElement.scrollTop = consoleElement.scrollHeight;
}

function toggleUIButtons(
skip = { start: false, publish: false, close: false }
) {
if (!skip.start) startButton.disabled = !startButton.disabled;
if (!skip.publish) publishButton.disabled = !publishButton.disabled;
if (!skip.close) closeButton.disabled = !closeButton.disabled;
if (!skip.publish) input.disabled = !input.disabled;
}

/* -------------------------------------------------------------------------- */

const connection = new APIRequest(HTTP_URL);

const init = async function () {
try {
toggleUIButtons({ publish: true, close: true });

/* -------------------- Login (your@email.com/password) -------------------- */

const { tenant } = await connection.login(EMAIL, PASSWORD);

print("Login Successfully using " + tenant);
/* ------------------------------ Create Stream ----------------------------- */

try {
const stream = await connection.req(
`/_fabric/_system/streams/${STREAM_NAME}?global=${IS_GLOBAL}`,
{
body: { name: STREAM_NAME },
method: "POST",
}
);
print("STREAM CREATED SUCCESSFULLY");
} catch (e) {
if (e.status == 409) {
print("Stream already exists, skipping creation of stream");
}
else {
print("Error while creating stream");
throw e;
}
}

/* ----------------- Publish and Subscribe message to stream ---------------- */

const region = IS_GLOBAL ? "c8global" : "c8local";
const streamName = `${region}s.${STREAM_NAME}`;

// Fetching local URL in case the stream is local
const localDcDetails = await connection.req(`/datacenter/local`, {
method: "GET",
});

const dcUrl = localDcDetails.tags.url;

const url = IS_GLOBAL
? URL_NAME
: `api-${dcUrl}`;

const otpConsumer = await connection.req(`/apid/otp`, {
method: "POST",
});
const otpProducer = await connection.req(`/apid/otp`, {
method: "POST",
});

const consumerUrl = `wss://${url}/_ws/ws/v2/consumer/persistent/${tenant}/${region}._system/${streamName}/${CONSUMER_NAME}?otp=${otpConsumer.otp}`;

const producerUrl = `wss://${url}/_ws/ws/v2/producer/persistent/${tenant}/${region}._system/${streamName}?otp=${otpProducer.otp}`;

/* -------------------------- Initalizing Consumer -------------------------- */

const initConsumer = () => {
return new Promise((resolve) => {
consumer = new WebSocket(consumerUrl);

consumer.onopen = function () {
print("Consumer is open now for " + streamName);
resolve();
};

consumer.onerror = function () {
print(
"Failed to establish Consumer connection for " + streamName
);
};

consumer.onclose = function () {
print("Closed Consumer connection for " + streamName);
};

consumer.onmessage = function (message) {
var receivedMsg = message.data && JSON.parse(message.data);
print(
"------------------ Consumer Message Received -----------------"
);
print(atob(receivedMsg.payload));
print(
"--------------------------------------------------------------"
);

const ackMsg = { messageId: receivedMsg.messageId };
consumer.send(JSON.stringify(ackMsg));
};
});
};

/* -------------------------- Initalizing Producer -------------------------- */

const initProducer = () => {
producer = new WebSocket(producerUrl);

producer.onopen = function () {
print("Producer is open now for " + streamName);
};

producer.onclose = function (e) {
print("Closed Producer connection for " + streamName);
};

producer.onerror = function (e) {
print("Failed to establish Producer connection for " + streamName);
};
};

initConsumer().then(() => {
initProducer();
toggleUIButtons({ start: true });
print(
"--------------------------------------------------------------"
);
print(
"----------YOU CAN NOW START PUBLISHING YOUR MESSAGES----------"
);
print(
"--------------------------------------------------------------"
);
});
} catch (e) {
console.error(e);
}
};

function publish() {
try {
const value = input.value.trim().replace(/(\r\n|\n|\r)/gm, "");
let msgToSend = value;

if (value[0] === "{" && value.slice(-1) === "}") {
msgToSend = JSON.stringify(JSON.parse(input.value));
}

producer.send(JSON.stringify({ payload: btoa(msgToSend) }));
print(`Sending message.... : ${msgToSend}`);
print(`Producer message sent`);
} catch (e) {
print(e);
}
}

async function closeConnection() {
toggleUIButtons();

consumer.close();
print("CONSUMER CLOSING...");
producer.close();
print("PRODUCER CLOSING...");

await new Promise((resolve) => setTimeout(resolve, 5000));

/* ------------------------ Unsubscribe from stream. ------------------------ */

await connection.req(
`/_fabric/_system/_api/streams/unsubscribe/${CONSUMER_NAME}`,
{
method: "POST",
}
);

print(`${CONSUMER_NAME} UNSUBSCRIBED SUCCESSFULLY`);
}
</script>
</html>