import ballerina/io;
import ballerina/log;
import ballerina/nats;
// Represents the escape character.
const string ESCAPE = "!q";
// Produces a message to a subject in the NATS sever.
public function main() {
string message = "";
string subject = io:readln("Subject : ");
nats:Connection conn = new;
nats:StreamingProducer publisher = new (conn);
while (message != ESCAPE) {
message = io:readln("Message : ");
if (message != ESCAPE) {
// Produces a message to the specified subject.
var result = publisher->publish(subject, <@untainted>message);
if (result is nats:Error) {
error e = result;
log:printError("Error occurred while closing the connection", e);
} else {
log:printInfo("GUID " + result
+ " received for the produced message.");
}
}
}
// Closes the connection.
var result = conn.close();
if (result is error) {
error e = result;
log:printError("Error occurred while closing the connection", e);
}
}
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
// Creates a NATS connection.
nats:Connection conn = new;
// Initializes the NATS Streaming listener.
listener nats:StreamingListener lis = new (conn);
// Binds the consumer to listen to the messages published to the 'demo' subject.
// By default, only new messages are received.
@nats:StreamingSubscriptionConfig {
subject: "demo"
}
service receiveNewOnly on lis {
resource function onMessage(nats:StreamingMessage message) {
// Prints the incoming message in the console.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveNewOnly: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives all messages from the beginning.
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: nats:FIRST
}
service receiveFromBegining on lis {
resource function onMessage(nats:StreamingMessage message) {
// Prints the incoming message in the console.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromBegining: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages starting from the last received message.
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: nats:LAST_RECEIVED
}
service receiveFromLastReceived on lis {
resource function onMessage(nats:StreamingMessage message) {
// Prints the incoming message in the console.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromLastReceived: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
[nats:SEQUENCE_NUMBER, int] sequenceNo = [nats:SEQUENCE_NUMBER, 3];
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages starting from the provided sequence number.
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: sequenceNo
}
service receiveFromGivenIndex on lis {
resource function onMessage(nats:StreamingMessage message) {
// Prints the incoming message in the console.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromGivenIndex: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
[nats:TIME_DELTA_START, int] timeDelta = [nats:TIME_DELTA_START, 5];
// Binds the consumer to listen to the messages published to the 'demo' subject.
// Receives messages since the provided historical time delta.
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: timeDelta
}
service receiveSinceTimeDelta on lis {
resource function onMessage(nats:StreamingMessage message) {
// Prints the incoming message in the console.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveSinceTimeDelta: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
Historical Message ReplayThis sample demonstrates leveraging the historical message replay feature of Streaming NATS. New subscriptions may specify a starting position in the stream of messages stored for the channel of the subscribed subject. Message delivery may begin at: 1. The earliest message stored for this subject 2. The most recently stored message for this subject prior to the start of the current subscription. 3. A historical offset from the current server date/time (e.g., the last 30 seconds). 4. A specific message sequence number |
import ballerina/io;
import ballerina/log;
import ballerina/nats;
const string ESCAPE = "!q";
Represents the escape character.
public function main() {
string message = "";
string subject = io:readln("Subject : ");
Produces a message to a subject in the NATS sever.
nats:Connection conn = new;
nats:StreamingProducer publisher = new (conn);
while (message != ESCAPE) {
message = io:readln("Message : ");
if (message != ESCAPE) {
var result = publisher->publish(subject, <@untainted>message);
if (result is nats:Error) {
error e = result;
log:printError("Error occurred while closing the connection", e);
} else {
log:printInfo("GUID " + result
+ " received for the produced message.");
}
}
}
Produces a message to the specified subject.
var result = conn.close();
if (result is error) {
error e = result;
log:printError("Error occurred while closing the connection", e);
}
}
Closes the connection.
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run publisher.bal
Subject : demo
[ballerina/nats] Connection established with server nats://localhost:4222
Message : First Message
GUID UBMEgrERHdxZRqUBP05PtD received for the produced message.
Message : Second Message
GUID UBMEgrERHdxZRqUBP05Puz received for the produced message.
Message : Third Message
GUID UBMEgrERHdxZRqUBP05Pwl received for the produced message.
Message : Forth Message
GUID UBMEgrERHdxZRqUBP05PyX received for the produced message.
import ballerina/lang.'string as strings;
import ballerina/log;
import ballerina/nats;
nats:Connection conn = new;
Creates a NATS connection.
listener nats:StreamingListener lis = new (conn);
Initializes the NATS Streaming listener.
@nats:StreamingSubscriptionConfig {
subject: "demo"
}
service receiveNewOnly on lis {
resource function onMessage(nats:StreamingMessage message) {
Binds the consumer to listen to the messages published to the ‘demo’ subject. By default, only new messages are received.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveNewOnly: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
Prints the incoming message in the console.
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: nats:FIRST
}
service receiveFromBegining on lis {
resource function onMessage(nats:StreamingMessage message) {
Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives all messages from the beginning.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromBegining: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
Prints the incoming message in the console.
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: nats:LAST_RECEIVED
}
service receiveFromLastReceived on lis {
resource function onMessage(nats:StreamingMessage message) {
Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages starting from the last received message.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromLastReceived: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
Prints the incoming message in the console.
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
[nats:SEQUENCE_NUMBER, int] sequenceNo = [nats:SEQUENCE_NUMBER, 3];
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: sequenceNo
}
service receiveFromGivenIndex on lis {
resource function onMessage(nats:StreamingMessage message) {
Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages starting from the provided sequence number.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveFromGivenIndex: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
Prints the incoming message in the console.
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
[nats:TIME_DELTA_START, int] timeDelta = [nats:TIME_DELTA_START, 5];
@nats:StreamingSubscriptionConfig {
subject: "demo",
startPosition: timeDelta
}
service receiveSinceTimeDelta on lis {
resource function onMessage(nats:StreamingMessage message) {
Binds the consumer to listen to the messages published to the ‘demo’ subject. Receives messages since the provided historical time delta.
string|error messageData = strings:fromBytes(message.getData());
if (messageData is string) {
log:printInfo("Message Received to service receiveSinceTimeDelta: "
+ messageData);
} else {
log:printError("Error occurred while obtaining message data");
}
}
Prints the incoming message in the console.
resource function onError(nats:StreamingMessage message, nats:Error errorVal) {
error e = errorVal;
log:printError("Error occurred: ", e);
}
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
# When you start the subscriber after publishing several messages,
# You'll notice that,
# 1. `receiveSinceTimeDelta` service receives the messages if
# the messages were sent within a historical offset of 5 seconds
# from the current server date/time
# 2. `receiveFromGivenIndex` service receives services messages
# starting from the third message published.
# 3. `receiveFromLastReceived` service receives messages starting
# from the last published message.
# 4. `receiveFromBeginning` service receives all messages ever
# published
# 5. `receiveNewOnly` service receives only the messages, which are
# published after the subscriber starts.
ballerina run subscriber.bal
[ballerina/nats] Connection established with server nats://localhost:4222
Message Received to service receiveSinceTimeDelta: Third Message
Message Received to service receiveFromGivenIndex: Third Message
Message Received to service receiveFromLastReceived: Third Message
Message Received to service receiveFromBeginning: First Message
Message Received to service receiveFromBeginning: Second Message
Message Received to service receiveFromBeginning: Third Message
Message Received to service receiveFromGivenIndex: Forth Message
Message Received to service receiveFromLastReceived: Forth Message
Message Received to service receiveNewOnly: Forth Message
Message Received to service receiveSinceTimeDelta: Forth Message
Message Received to service receiveFromBeginning: Forth Message