import ballerina/io;
import ballerina/log;
import ballerina/nats;

// Represents the escape character.
const string ESCAPE = "!q";

// Produces a message to a subject in the NATS sever.
public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

    nats:Connection conn = new;

    nats:StreamingProducer publisher = new (conn);

    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            // Produces a message to the specified subject.
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }
    // Closes the connection.
    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;

// Creates a NATS connection.
nats:Connection conn = new;

// Initializes the NATS Streaming listener.
listener nats:StreamingListener lis = new (conn);

// Binds the consumer to listen to the messages published to the 'demo' subject.
// By default, only new messages are received.
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service receiveNewOnly on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveNewOnly: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives all messages from the beginning.
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: nats:FIRST
}
service receiveFromBegining on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromBegining: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages starting from the last received message.
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: nats:LAST_RECEIVED
}
service receiveFromLastReceived on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromLastReceived: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

[nats:SEQUENCE_NUMBER, int] sequenceNo = [nats:SEQUENCE_NUMBER, 3];
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages starting from the provided sequence number.
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: sequenceNo
}
service receiveFromGivenIndex on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromGivenIndex: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

[nats:TIME_DELTA_START, int] timeDelta = [nats:TIME_DELTA_START, 5];
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages since the provided historical time delta.
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: timeDelta
}
service receiveSinceTimeDelta on lis {
    resource function onMessage(nats:StreamingMessage message) {
        // Prints the incoming message in the console.
        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveSinceTimeDelta: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}

Historical Message Replay

This sample demonstrates leveraging the historical message replay feature of Streaming NATS. New subscriptions may specify a starting position in the stream of messages stored for the channel of the subscribed subject. Message delivery may begin at: 1. The earliest message stored for this subject 2. The most recently stored message for this subject prior to the start of the current subscription. 3. A historical offset from the current server date/time (e.g., the last 30 seconds). 4. A specific message sequence number

import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";

Represents the escape character.

public function main() {
    string message = "";
    string subject = io:readln("Subject : ");

Produces a message to a subject in the NATS sever.

    nats:Connection conn = new;
    nats:StreamingProducer publisher = new (conn);
    while (message != ESCAPE) {
        message = io:readln("Message : ");
        if (message != ESCAPE) {
            var result = publisher->publish(subject, <@untainted>message);
            if (result is nats:Error) {
                error e = result;
                log:printError("Error occurred while closing the connection", e);
            } else {
                log:printInfo("GUID " + result
                                        + " received for the produced message.");
            }
        }
    }

Produces a message to the specified subject.

    var result = conn.close();
    if (result is error) {
        error e = result;
        log:printError("Error occurred while closing the connection", e);
    }
}

Closes the connection.

# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run publisher.bal
Subject : demo
[ballerina/nats] Connection established with server nats://localhost:4222
Message : First Message
GUID UBMEgrERHdxZRqUBP05PtD received for the produced message.
Message : Second Message
GUID UBMEgrERHdxZRqUBP05Puz received for the produced message.
Message : Third Message
GUID UBMEgrERHdxZRqUBP05Pwl received for the produced message.
Message : Forth Message
GUID UBMEgrERHdxZRqUBP05PyX received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;

Creates a NATS connection.

listener nats:StreamingListener lis = new (conn);

Initializes the NATS Streaming listener.

@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service receiveNewOnly on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. By default, only new messages are received.

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveNewOnly: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: nats:FIRST
}
service receiveFromBegining on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives all messages from the beginning.

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromBegining: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: nats:LAST_RECEIVED
}
service receiveFromLastReceived on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages starting from the last received message.

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromLastReceived: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
[nats:SEQUENCE_NUMBER, int] sequenceNo = [nats:SEQUENCE_NUMBER, 3];
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: sequenceNo
}
service receiveFromGivenIndex on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages starting from the provided sequence number.

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveFromGivenIndex: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
[nats:TIME_DELTA_START, int] timeDelta = [nats:TIME_DELTA_START, 5];
@nats:StreamingSubscriptionConfig {
    subject: "demo",
    startPosition: timeDelta
}
service receiveSinceTimeDelta on lis {
    resource function onMessage(nats:StreamingMessage message) {

Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages since the provided historical time delta.

        string|error messageData = strings:fromBytes(message.getData());
        if (messageData is string) {
            log:printInfo("Message Received to service receiveSinceTimeDelta: "
                                                                  + messageData);
        } else {
            log:printError("Error occurred while obtaining message data");
        }
    }

Prints the incoming message in the console.

    resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
        error e = errorVal;
        log:printError("Error occurred: ", e);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
# When you start the subscriber after publishing several messages,
# You'll notice that,
# 1. `receiveSinceTimeDelta` service receives the messages if
#     the messages were sent within a historical offset of 5 seconds
#     from the current server date/time
# 2. `receiveFromGivenIndex` service receives services messages
#     starting from the third message published.
# 3. `receiveFromLastReceived` service receives messages starting
#     from the last published message.
# 4. `receiveFromBeginning` service receives all messages ever
#     published
# 5. `receiveNewOnly` service receives only the messages, which are
#    published after the subscriber starts.
ballerina run subscriber.bal
[ballerina/nats] Connection established with server nats://localhost:4222
Message Received to service receiveSinceTimeDelta: Third Message
Message Received to service receiveFromGivenIndex: Third Message
Message Received to service receiveFromLastReceived: Third Message
Message Received to service receiveFromBeginning: First Message
Message Received to service receiveFromBeginning: Second Message
Message Received to service receiveFromBeginning: Third Message
Message Received to service receiveFromGivenIndex: Forth Message
Message Received to service receiveFromLastReceived: Forth Message
Message Received to service receiveNewOnly: Forth Message
Message Received to service receiveSinceTimeDelta: Forth Message
Message Received to service receiveFromBeginning: Forth Message