Module : nats

Module overview

This module provides the capability to connect with NATS and NATS Streaming servers and performs the following functionality.

Basic Usage

Setting up the connection

First step is setting up the connection with the NATS Basic/Streaming server. The following ways can be used to connect to a NATS Basic/Streaming server.

  1. Connect to a server using the URL
nats:Connection connection = new("nats://localhost:4222");
  1. Connect to one or more servers with a custom configuration
nats:Connection connection = new("nats://serverone:4222, nats://servertwo:4222",  config);

Publishing messages

Publishing messages is handled differently in the NATS Basic server and Streaming server. The 'ballerina/nats' module provides different APIs to publish messages to each server.

Publishing messages to the NATS basic server

Once connected, publishing is accomplished via one of the below two methods.

  1. Publish with the subject and the message content.
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, "hello world");
  1. Publish as a request that expects a reply.
nats:Producer producer = new(connection);
nats:Message|nats:Error reqReply = producer->request(subject, "hello world", 5000);
  1. Publish messages with a replyTo subject
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToSubject);
  1. Publish messages with a replyTo callback service
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToService);
service replyToService =
@nats:SubscriptionConfig {
    subject: "replySubject"
}
service {

    resource function onMessage(nats:Message msg, string data) {
        // Prints the incoming message in the console.
        log:printInfo("Received reply message : " + data);
    }

    resource function onError(nats:Message msg, nats:Error err) {
        log:printError("Error occurred in data binding", err);
    }
};

Publishing messages to a NATS streaming server

Once connected to a streaming server, publishing messages is accomplished using the following method.

nats:StreamingProducer producer = new(connection);
string|error result = producer->publish(subject, "hello world");
if (result is error) {
   io:println("Error occurred while publishing the message.");
} else {
   io:println("GUID "+result+" received for the produced message.");
}

Publish api supports the byte[], boolean, string, int, float, decimal, xml, json, record {} message types.

Listening to incoming messages

The Ballerina NATS module provides the following mechanisms to listen to messages. Similar to message publishing, listening to messages is also handled differently in NATS basic and streaming servers.

Listening to messages from a NATS server

import ballerina/io;
import ballerina/nats;

// Initializes the NATS listener.
listener nats:Listener subscription = new(connection);

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:SubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:Message msg, string data) {
        // Prints the incoming message in the console.
        io:println("Subject : " + msg.getSubject());
        io:println("Message content : " + data));
    }

    resource function onError(nats:Message msg, nats:Error err) {
        io:println(err);
    }

}

Listening to messages from a Streaming server

import ballerina/io;
import ballerina/nats;

// Initializes the NATS Streaming listener.
listener nats:StreamingListener subscription = new(conn, "test-cluster", "c1");

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:StreamingMessage msg, string data) {
        // Prints the incoming message in the console.
        io:println("Subject : " + msg.getSubject());
        io:println("Message content : " + data));
    }

    resource function onError(nats:StreamingMessage msg, nats:Error err) {
        io:println(err);
    }

}

Advanced Usage

Using the TLS protocol

The Ballerina NATS module allows the use of the tls:// protocol in its URLs. This setting expects a secure socket to be set in the connection configuration as shown below.

nats:ConnectionConfig config = {
    secureSocket : {
        trustStore : {
            path: "nats-basic/keyStore.p12",
            password: "xxxxx"
        }
    }
};

// Initializes a connection.
nats:Connection connection = new("tls://localhost:4222", config = config);

Note: The default thread pool size used in Ballerina is number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

Records

ConnectionConfig The parameters used to create a NATS streaming subscription.
Detail Record type to hold the details of an error.
PendingLimits Parameters to set limits on the maximum number of messages, or maximum size of messages this consumer will hold before it starts to drop new messages waiting for the resource functions to drain the queue. Setting a value to anything less than or equal to 0 will disable this check.
SecureSocket Provides configurations for facilitating secure communication with a remote HTTP endpoint.
StreamingConfig Configuration related to establishing a streaming connection.
StreamingSubscriptionConfigData The parameters used to create a NATS streaming subscription.
SubscriptionConfigData NATS basic subscription parameters.

Objects

Connection

Represents a single network connection to the NATS server.

Message

Represents a message, which will be pushed from the NATS server to the consumer.

Clients

Producer

NATS Producer would act as a basic client allowing to publish messages to the NATS server. Producer needs the NATS Connection to be initialized.

StreamingMessage

Represents the message a NATS Streaming Server sends to a subscription service.

StreamingProducer

NATS StreamingProducer would act as a client allowing to publish messages to the NATS streaming server. StreamingProducer needs the NATS Connection to be initialized.

Listeners

Listener

Represents a connection which will be used for subscription.

StreamingListener

Represents the NATS Streaming Server connection, to which a subscription service should be bound to in order to receive messages of the corresponding subscription.

Constants

NATS_ERROR
NEW_ONLY
LAST_RECEIVED
FIRST
TIME_DELTA_START
SEQUENCE_NUMBER

Annotations

StreamingSubscriptionConfig

Streaming subscription configuration annotation.

SubscriptionConfig

Basic Subscription config annotation.

Types

Content
StartPosition

The position to start receiving messages. NEW_ONLY - Specifies that message delivery should start with the messages, which are published after the subscription is created. LAST_RECEIVED - Specifies that message delivery should start with the last (most recent) message stored for this subject. TimeDeltaStart - Specifies that message delivery should start with a given historical time delta (from now). SequenceNumber - Specifies that message delivery should start at the given sequence number. FIRST - Specifies that message delivery should begin at the oldest available message for this subject.

Errors

Error