import ballerina/io;
import ballerina/log;
import ballerina/nats;

// Represents the escape character.
const string ESCAPE = "!q";

// Produces a message to a subject in the NATS sever.
public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

    nats:Connection conn = new;

    nats:StreamingProducer publisher = new (conn);

    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            // Produces a message to the specified subject.
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }
    // Closes the connection.
    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;

// Creates a NATS connection.
nats:Connection conn = new;

// Initializes the NATS Streaming listener.
listener nats:StreamingListener lis = new (conn);

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {
       // Prints the incoming message in the console.
       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

Basic Streaming Publisher and Subscriber

This sample demonstrates the basic usage of the NATS Streaming client to publish data to a subject and consume data from that subject. In order to run this sample, a NATS Streaming server should be running on the corresponding port used in the sample.

import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";

Represents the escape character.

public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

Produces a message to a subject in the NATS sever.

    nats:Connection conn = new;
    nats:StreamingProducer publisher = new (conn);
    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }

Produces a message to the specified subject.

    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}

Closes the connection.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run publisher.bal
Subject : demo
Message : Hello Ballerina!
GUID m2jS6SLLefK325DWTkkwBh received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;

Creates a NATS connection.

listener nats:StreamingListener lis = new (conn);

Initializes the NATS Streaming listener.

@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demoService on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject.

       string|error messageData = strings:fromBytes(message.getData());
       if (messageData is string) {
            log:printInfo("Message Received: " + messageData);
       } else {
            log:printError("Error occurred while obtaining message data");
       }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run subscriber.bal
Received message : Hello Ballerina!