This API documentation is for Ballerina 1.2.0. View API documentation for the latest release.

Module : nats

Module Overview

This module provides the capability to connect with NATS and NATS Streaming servers and performs the below functionalities.

Basic Usage

Setting up the connection

First step is setting up the connection with the NATS Basic/Streaming server. The following ways can be used to connect to a NATS Basic/Streaming server.

  1. Connect to a server using the URL
nats:Connection connection = new("nats://localhost:4222");
  1. Connect to one or more servers with a custom configuration
nats:Connection connection = new("nats://serverone:4222, nats://servertwo:4222",  config);

Publishing messages

Publishing messages is handled differently in the NATS Basic server and Streaming server. The 'ballerina/nats' module provides different APIs to publish messages to each server.

Publishing messages to the NATS basic server

Once connected, publishing is accomplished via one of the below two methods.

  1. Publish with the subject and the message content.
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, "hello world");
  1. Publish as a request that expects a reply.
nats:Producer producer = new(connection);
nats:Message|nats:Error reqReply = producer->request(subject, "hello world", 5000);
  1. Publish messages with a replyTo subject
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToSubject);
  1. Publish messages with a replyTo callback service
nats:Producer producer = new(connection);
nats:Error? result = producer->publish(subject, <@untainted>message, 
                         replyToService);
service replyToService =
@nats:SubscriptionConfig {
    subject: "replySubject"
}
service {

    resource function onMessage(nats:Message msg, string data) {
    }

    resource function onError(nats:Message msg, nats:Error err) {
    }
};
Publishing messages to a NATS streaming server

Once connected to a streaming server, publishing messages is accomplished using the following method.

nats:StreamingProducer producer = new(connection);
string|error result = producer->publish(subject, "hello world");

Publish api supports the byte[], boolean, string, int, float, decimal, xml, json, record {} message types.

Listening to incoming messages

The Ballerina NATS module provides the following mechanisms to listen to messages. Similar to message publishing, listening to messages is also handled differently in the NATS basic and streaming servers.

Listening to messages from a NATS server
// Initializes the NATS listener.
listener nats:Listener subscription = new(connection);

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:SubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:Message msg, string data) {
    }

    resource function onError(nats:Message msg, nats:Error err) {
    }
}
Listening to messages from a Streaming server
// Initializes the NATS Streaming listener.
listener nats:StreamingListener subscription = new(conn, "test-cluster", "c1");

// Binds the consumer to listen to the messages published to the 'demo' subject.
@nats:StreamingSubscriptionConfig {
    subject: "demo"
}
service demo on subscription {

    resource function onMessage(nats:StreamingMessage msg, string data) {
    }

    resource function onError(nats:StreamingMessage msg, nats:Error err) {
    }

}

Advanced Usage

Using the TLS protocol

The Ballerina NATS module allows the use of the tls:// protocol in its URLs. This setting expects a secure socket to be set in the connection configuration as shown below.

nats:ConnectionConfig config = {
    secureSocket : {
        trustStore : {
            path: "nats-basic/keyStore.p12",
            password: "xxxxx"
        }
    }
};

// Initializes a connection.
nats:Connection connection = new("tls://localhost:4222", config = config);

Note: The default thread pool size used in Ballerina is the number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

For information on the operations, which you can perform with this module, see the below Functions.

For examples on the usage of the connector, see the following.

Records

ConnectionConfig

Configurations related to creating a NATS streaming subscription.

Detail

Holds the details of an error.

PendingLimits

The configurations to set limits on the maximum number of messages or maximum size of messages this consumer will hold before it starts to drop new messages waiting for the resource functions to drain the queue.

SecureSocket

Configurations related to facilitating a secure communication with a remote HTTP endpoint.

StreamingConfig

Configuration related to establishing a streaming connection.

StreamingSubscriptionConfigData

The configurations for the NATS streaming subscription.

SubscriptionConfigData

The configurations for the NATS basic subscription.

Objects

Connection

Represents a single network connection to the NATS server.

Message

Represents the message, which a NATS server sends to its subscribed services.

Clients

Producer

The producer provides the capability to publish messages to the NATS server.

StreamingMessage

Represents the message a NATS Streaming Server sends to its subscribed services.

StreamingProducer

The streaming producer provides the capability to publish messages to the NATS streaming server.

Listeners

Listener

Represents the NATS server connection to which a subscription service should be bound in order to receive messages of the corresponding subscription.

StreamingListener

Represents the NATS streaming server connection to which a subscription service should be bound in order to receive messages of the corresponding subscription.

Constants

DEFAULT_URL

Default URL for NATS connections.

NATS_ERROR

Represents the reason for the NATS module related errors.

NEW_ONLY

Specifies that message delivery should start with the messages, which are published after the subscription is created.

LAST_RECEIVED

Specifies that message delivery should start with the last (most recent) message stored for this subject.

FIRST

Specifies that message delivery should begin at the oldest available message for this subject.

TIME_DELTA_START

The key for the TimeDeltaStart type.

SEQUENCE_NUMBER

The key for the SequenceNumber type.

Annotations

StreamingSubscriptionConfig

The annotation, which is used to configure the streaming subscription.

SubscriptionConfig

The annotation, which is used to configure the basic subscription.

Types

Content

Data types supported when publishing and consuming messages.

SequenceNumber

Specifies that message delivery should start at the given sequence number.

StartPosition

Specifies the position to start receiving messages.

TimeDeltaStart

Specifies that message delivery should start with a given historical time delta (from now).

Errors

Error

Represents NATS module related errors.