Skip to main content

Create Producers

This page describes how to create a producer.

Prerequisites

Create a Producer with Specific Options

This example shows how to create a producer with specific options using the JavaScript SDK and Python SDK. When we create a producer, we can specify the following options:

OptionDescriptionDefault
sendTimeoutMillisSend timeout30 secs
batchingEnabledEnable batching of messagesfalse
batchingMaxMessagesMaximum number of messages permitted in a batch1000
maxPendingMessagesSet the max size of the internal-queue holding the messages1000
batchingMaxPublishDelayTime period within which the messages will be batched10ms

sendTimeoutMillis: Specifies the time in milliseconds the producer waits for acknowledgement from the broker after sending a message. If the acknowledgement isn't received within this time, the send operation is considered a failure. Longer timeouts can be more tolerant of network issues but may lead to slower throughput.

batchingEnabled: Determines whether messages are batched before being sent to the broker. Batching can help increase throughput and reduce overhead but may introduce a slight delay in message delivery.

batchingMaxMessages: Sets the maximum number of messages allowed in a batch. Larger batches can increase throughput but may consume more memory and introduce latency.

maxPendingMessages: Controls the maximum size of the internal queue holding the messages before they are sent to the broker. This can be helpful for controlling memory usage and preventing excessive message backlog. If the queue reaches its maximum size, new messages may be blocked or fail immediately, depending on the producer configuration.

batchingMaxPublishDelay: Defines the maximum time period in milliseconds within which messages are batched before being sent to the broker. Lower values can help ensure faster delivery at the cost of potentially smaller batches and reduced throughput.

Create Producer Code

When this code runs, it creates a new client, requests a stream object, and then creates a producer.

If you're using JavaScript, the code creates a jsc8 client. If Python, it creates a C8Client

const jsc8 = require("jsc8");

const BASE_URL = "https://play.paas.macrometa.io/"

client = new jsc8({
url: BASE_URL,
apiKey: "xxxxxx",
fabricName: "_system",
});

const streamName = "streamQuickstart";

async function main () {
async function createStream() {
if (await client.hasStream(streamName, false)) {
console.log("This stream already exists!");
console.log(`Existing Producer = c8globals.${streamName}`);
} else {
console.log("\nCreating global stream...");
// To create a global stream, set the second parameter to false
// There is an option to create a local stream, which is only accessible within the region
const streamInfo = await client.createStream(streamName, false);
console.log(`New Producer = ${streamInfo.result["stream-id"]}`);
}
}

async function producer() {
try {
// Create stream only if stream does not exist
await createStream();
await console.log("\nConnecting producer to global stream...");

// Request stream object
const stream = client.stream(streamName, false);
// Request one-time password
const producerOTP = await stream.getOtp();

// ********** Producer Options **********
// Create producer
const producer = await stream.producer(BASE_URL, {
otp: producerOTP,
sendTimeoutMillis: 30000, // Default is 30000 ms
batchingEnabled: true, // Default is false
compressionType: 'LZ4', // Options: NONE, LZ4, ZLIB, ZSTD, SNAPPY -> default is NONE
batchingMaxMessages: 100, // Default is 1000
batchingMaxPublishDelayMs: 10 // Default is 10 ms
});

// Run producer - Open connection to server
producer.on("open", () => {});

// Set messages in interval of 1000 ms
setInterval(() => {
// If your message is an object, convert the object to a string.
// e.g. const message = JSON.stringify({message:'Hello World'});
const message = `Hello Macrometa Stream! Here is your random message number ${Math.floor(
Math.random() * 101
)}`;
let payloadObj = { payload: Buffer.from(message).toString("base64") };
producer.send(JSON.stringify(payloadObj));
}, 1000);

producer.onclose = function () {
console.log("Closed WebSocket:Producer connection for " + streamName);
};
} catch (e) {
await console.log("Error while creating stream publisher" + e);
}
}

producer();
}
main();