import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";
public function main() {
    string message = "";
    string subject = io:readln("Subject : ");    nats:Connection conn = new;    nats:StreamingProducer publisher = new (conn);    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }
    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;
listener nats:StreamingListener lis = new (conn);
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {
       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

Basic Streaming Publisher and Subscriber

This sample demonstrates the basic usage of the NATS Streaming client to publish data to a subject and consume data from that subject. In order to run this sample, a NATS Streaming server should be running on the corresponding port used in the sample.

import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";

Represents the escape character.

public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

Produces a message to a subject in the NATS sever.

    nats:Connection conn = new;
    nats:StreamingProducer publisher = new (conn);
    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }

Produces a message to the specified subject.

    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}

Closes the connection.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command.
$ ballerina run publisher.bal
Subject : demo
Message : Hello Ballerina!
GUID m2jS6SLLefK325DWTkkwBh received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;

Creates a NATS connection.

listener nats:StreamingListener lis = new (conn);

Initializes the NATS Streaming listener.

@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject.

       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command.
$ ballerina run subscriber.bal
Received message : Hello Ballerina!