Skip to main content

Manage Readers

Global Data Network (GDN) stream readers are similar to stream consumers, but there are two crucial differences between them:

  • Readers allow you to specify the starting point for processing messages within a stream. In contrast, consumers always begin with the earliest or latest available unacknowledged message.
  • Unlike consumers, readers do not retain data or acknowledge messages. This means that readers can access messages without affecting other consumers or the message state within the stream.
  • Readers read both acknowledged and unacknowledged messages.

When to Use Readers

Readers are useful in scenarios where you want to process the stream data without affecting the message acknowledgment state or when you need to start processing messages from a specific point in the stream. For example, you might use a reader for analytics, monitoring, or auditing purposes.

Create a Reader

To create a GDN stream reader, use the createStreamReader method provided by the SDK. Here's an example:

const streamName = "my-stream";
const subscriptionName = "my-sub";
const local = false; // Change this to true for local streams
const dcName = "your-datacenter-name"; // Optional

const reader = await client.createStreamReader(streamName, subscriptionName, local, dcName);

Read Messages with a Reader

Once you have created a reader, you can start listening for messages by registering event listeners. The following event listeners are available:

  • open: Triggered when the connection to the stream is established.
  • message: Triggered when a new message is received from the stream.
  • close: Triggered when the connection to the stream is closed.
  • error: Triggered when an error occurs during the connection or message processing.

Here's an example of how to use event listeners with a reader:

reader.on("open", () => {
console.log("Connection to the stream is open.");
});

reader.on("message", (msg) => {
console.log("Received message:", msg);
});

reader.on("close", () => {
console.log("Connection to the stream is closed.");
});

reader.on("error", (err) => {
console.error("Error:", err);
});

Close a Reader

To close a reader and release resources, call the close method:

reader.close();

This will also trigger the close event listener.

Example

Suppose you are interested in analyzing the log messages generated by this stream worker for further analysis, such as detecting abnormal price fluctuations or identifying trends. You want to read the log messages from the a stream without affecting the state of the messages or other consumers.

In this example, you'll learn how to create and use a GDN stream reader to process messages from the c8locals.streamworkerslog stream. This stream is produced by a stream worker that analyzes Bitcoin prices in real-time and logs the price changes.

Prerequisites

Create the Stream

To create the stream with messages for this example, you can load and publish the Sample-LogProducer-App stream worker in the Macrometa console.

  1. Follow the instructions in Get Sample Stream Workers to load Sample-LogProducer-App.
  2. Publish the stream worker.

Reader Example Code

  1. Copy and paste the code block below in your favorite IDE.
  2. Update constants with your values, such as the API key.
  3. Run the code.

Here's an example of how to create a reader, read messages from the c8locals.streamworkerslog stream, and display them in the terminal:

// Import required modules
const jsc8 = require("jsc8");
const readline = require("readline");

// Configure the client
const globalUrl = "https://play.paas.macrometa.io";
const apiKey = "xxxx"; // Change this to your API Key
const client = new jsc8({
url: globalUrl,
apiKey: apiKey,
fabricName: "_system"
});

// Variables
const streamName = "c8locals.streamworkerslog";
const subscriptionName = "btc-price-analysis";
const isLocal = true; // Set to true for local streams, false for global streams

// Create a stream reader
async function createStreamReader() {
console.log("Creating stream reader...");
const reader = await client.createStreamReader(streamName, subscriptionName, isLocal);

// Configure event listeners
reader.on("open", () => {
console.log("Connection to the stream is open.");
});

reader.on("message", (msg) => {
const { payload } = JSON.parse(msg);
const data = JSON.parse(Buffer.from(payload, "base64").toString("ascii"));
const { priority, event: { prev_price, curr_price } } = data;

console.log(`Priority: ${priority}, Previous price: ${prev_price}, Current price: ${curr_price}`);
});

reader.on("close", () => {
console.log("Connection to the stream is closed.");
});

reader.on("error", (err) => {
console.error("Error:", err);
});

// Close the reader when the user types '0'
const input = readline.createInterface({
input: process.stdin,
output: process.stdout
});

input.question("Type '0' to exit anytime:\n", (userInput) => {
if (userInput === "0") {
reader.close();
input.close();
}
});
}

// Execute the function
(async function () {
await createStreamReader();
})();

This code initializes a GDN client, creates a stream reader, and listens for messages. When messages are received, they are decoded from base64 and displayed in the terminal. The reader can be closed by typing '0' in the terminal.

When you run this code, you'll see output like the following:

Creating stream reader...
Connection to the stream is open.
Priority: WARN, Previous price: 27994.78, Current price: 27985.99
Priority: INFO, Previous price: 27987.69, Current price: 27994.78
Priority: WARN, Previous price: 27987.83, Current price: 27987.69
...

The output shows the decoded log messages from the c8locals.streamworkerslog stream, which can be used for further analysis and monitoring of Bitcoin prices.