import ballerina/io;
import ballerina/log;
import ballerina/nats;

// Represents the escape character.
const string ESCAPE = "!q";

// Produces a message to a subject in the NATS sever.
public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

    nats:Connection conn = new;

    nats:StreamingProducer publisher = new (conn);

    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            // Produces a message to the specified subject.
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }
    // Closes the connection.
    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;

// Creates a NATS connection.
nats:Connection conn = new;

// Initializes the NATS Streaming listeners.
listener nats:StreamingListener lis = new (conn);


// Binds the consumer to listen to the messages published to the 'demo' subject.
// Belongs to the queue group named "sample-queue-group"
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service firstQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to first queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}


// Binds the consumer to listen to the messages published to the 'demo' subject.
// Belongs to the queue group named "sample-queue-group"
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service secondQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to second queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}


// Binds the consumer to listen to the messages published to the 'demo' subject.
// Belongs to the queue group named "sample-queue-group"
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service thridQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to third queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

Queue Groups

This sample demonstrates leveraging the NATS built-in load balancing feature called “distributed queues”. All subscribers with the same queue name form the queue group. As messages on the registered subject are published, one member of the group is chosen randomly to receive the message. Although queue groups have multiple subscribers, each message is consumed by only one.

import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";

Represents the escape character.

public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

Produces a message to a subject in the NATS sever.

    nats:Connection conn = new;
    nats:StreamingProducer publisher = new (conn);
    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }

Produces a message to the specified subject.

    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}

Closes the connection.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run publisher.bal
Subject : demo
Message : First Message
GUID m2jS6SLLefK325DWTkkwBh received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;

Creates a NATS connection.

listener nats:StreamingListener lis = new (conn);

Initializes the NATS Streaming listeners.

@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service firstQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Belongs to the queue group named “sample-queue-group”

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to first queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service secondQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Belongs to the queue group named “sample-queue-group”

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to second queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    queueName: "sample-queue-group"
}
service thridQueueGroupMember on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Belongs to the queue group named “sample-queue-group”

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to third queue group member: "
                                                                + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
# `queue-group.bal` contains three services belonging to the same
# queue group.
# When several messages are published, it can be noticed that
# each message is received by only one queue group member.
ballerina run queue-group.bal
Message Received to third queue group member: First Message
Message Received to second queue group member: Second Message
Message Received to first queue group member: Third Message