import ballerina/io;
import ballerina/log;
import ballerina/nats;

// Represents the escape character.
const string ESCAPE = "!q";

// Produces a message to a subject in the NATS sever.
public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

    nats:Connection conn = new;

    nats:StreamingProducer publisher = new (conn);

    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            // Produces a message to the specified subject.
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }
    // Closes the connection.
    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;

// Creates a NATS connection.
nats:Connection conn = new;

// Initializes the NATS Streaming listener.
listener nats:StreamingListener lis = new (conn, clientId = "c0");

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    durableName: "sample-name"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {
       // Prints the incoming message in the console.
       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

Durable Subscriptions

This sample demonstrates creating a durable subscription in the NATS Streaming server. Regular subscriptions remember their position while the client is connected. If the client disconnects, the position is lost. Durable subscriptions remember their position even if the client is disconnected.

import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";

Represents the escape character.

public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

Produces a message to a subject in the NATS sever.

    nats:Connection conn = new;
    nats:StreamingProducer publisher = new (conn);
    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }

Produces a message to the specified subject.

    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}

Closes the connection.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run publisher.bal
Subject : demo
Message : First Message
GUID m2jS6SLLefK325DWTkkwBh received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;

Creates a NATS connection.

listener nats:StreamingListener lis = new (conn, clientId = "c0");

Initializes the NATS Streaming listener.

@nats:StreamingSubscriptionConfig {
    subject: "demo",
    durableName: "sample-name"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject.

       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run subscriber.bal
Received message : First Message
# Stop the subscriber and publish some messages while it is stopped.
# Run the subscriber again.
# All messages which had been published while the subscriber
# wasn't running should be received.
ballerina run subscriber.bal
Received message : Second Message