Specification: Ballerina java.jms Library

Owners: @shafreenAnfar @ayeshLK
Reviewers: @shafreenAnfar
Created: 2023/08/15
Updated: 2024/02/02
Edition: Swan Lake

Introduction

This is the specification for the java.jms standard library of Ballerina language, which provides the functionality to send and receive messages by connecting to a JMS provider.

The java.jms library specification has evolved and may continue to evolve in the future. The released versions of the specification can be found under the relevant GitHub tag.

If you have any feedback or suggestions about the library, start a discussion via a GitHub issue or in the Discord server. Based on the outcome of the discussion, the specification and implementation can be updated. Community feedback is always welcome. Any accepted proposal which affects the specification is stored under /docs/proposals. Proposals under discussion can be found with the label type/proposal in Github.

The conforming implementation of the specification is released to Ballerina Central. Any deviation from the specification is considered a bug.

Content

  1. Overview
  2. Connection
  3. Session
  4. Message
  5. Message producer
  6. Message consumer
  7. Message listener

1. Overview

Java Message Service (JMS) is a Java-based API that provides a standardized way for applications to create, send, receive, and consume messages in a loosely coupled, reliable, and asynchronous manner. JMS is part of the Java EE (Enterprise Edition) specification and is commonly used for building distributed, decoupled, and scalable applications that need to exchange information or events between different components. This specification elaborates on the usage of JMS connection, session, message producer, and message consumer. These clients allow the writing of distributed applications and microservices that read, write, and process messages using a JMS provider.

Ballerina java.jms provides several core apis:

  • jms:Connection - represents a communication link between JMS client and JMS provider.
  • jms:Session - represents a single-threaded context for producing and consuming messages.
  • jms:MessageProducer - used to send messages to a specific destination (queue or a topic) within the JMS session.
  • jms:MessageConsumer - used to receive messages from a specific destination (queue or a topic) within the JMS session.
  • jms:Listener - used to receive messages from a specific destination (queue or a topic) in an asynchronous manner.

2. Connection

A JMS connection represents a communication link between a JMS client and a JMS provider (typically a messaging broker). It's established using a JMS connection factory. The connection manages underlying resources, such as network connections, and provides a context for creating JMS sessions.

2.1. Configuration

When initializing a jms:Connection, the following configurations can be provided.

# Configurations related to a JMS connection.
#
# + initialContextFactory - JMS provider-specific inital context factory
# + providerUrl - JMS provider specific provider URL used to configure a connection
# + connectionFactoryName - JMS connection factory to be used in creating JMS connections
# + username - Username for the JMS connection
# + password - Password for the JMS connection
# + properties - Additional properties use in initializing the initial context
public type ConnectionConfiguration record {|
    string initialContextFactory;
    string providerUrl;
    string connectionFactoryName = "ConnectionFactory";
    string username?;
    string password?;
    map<string> properties = {};
|};

2.2. Initialization

The jms:Connection can be initialized by providing the jms:ConnectionConfiguration.

# Initialize and starts a JMS connection.
# ```
# jms:Connection connection = check new (
#   initialContextFactory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
#   providerUrl = "tcp://localhost:61616"
# );
# ```
#
# + connectionConfig - The configurations to be used when initializing the JMS connection
# + return - The `jms:Connection` or an `jms:Error` if the initialization failed
public isolated function init(*jms:ConnectionConfiguration connectionConfig) returns jms:Error?;

2.3. Functions

To start (or restart) a connection's delivery of incoming messages for a connection, the 'start function can be used.

# Starts (or restarts) a connection's delivery of incoming messages.
# A call to start on a connection that has already been started is ignored.
# ```
# check connection->'start()
# ```
#
# + return - A `jms:Error` if there is an error while starting the connection
isolated remote function 'start() returns jms:Error?;

To temporarily stop the connection's delivery of incoming messages for a connection, the stop function can be used.

# Temporarily stops a connection's delivery of incoming messages.
# Delivery can be restarted using the connection's start method.
# ```
# check connection->stop();
# ```
#
# + return - A `jms:Error` if there is an error while stopping the connection
isolated remote function stop() returns jms:Error?;

To close the JMS connection, the close function can be used.

# Closes the connection.
# ```
# check connection->close();
# ```
#
# + return - A `jms:Error` if threre is an error while closing the connection
isolated remote function close() returns jms:Error?;

To initialize a new jms:Session, the createSession function can be used.

# Defines the JMS session acknowledgement modes.
public enum AcknowledgementMode {
    # Indicates that the session will use a local transaction which may subsequently 
    # be committed or rolled back by calling the session's `commit` or `rollback` methods. 
    SESSION_TRANSACTED = "SESSION_TRANSACTED",
    # Indicates that the session automatically acknowledges a client's receipt of a message 
    # either when the session has successfully returned from a call to `receive` or when 
    # the message listener the session has called to process the message successfully returns.
    AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE",
    # Indicates that the client acknowledges a consumed message by calling the 
    # MessageConsumer's or Caller's `acknowledge` method. Acknowledging a consumed message 
    # acknowledges all messages that the session has consumed.
    CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE",
    # Indicates that the session lazily acknowledges the delivery of messages. 
    # This is likely to result in the delivery of some duplicate messages if the JMS provider fails, 
    # so it should only be used by consumers that can tolerate duplicate messages. 
    # Use of this mode can reduce session overhead by minimizing the work the session does to prevent duplicates.
    DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE"
}

# Create a Session object, specifying transacted and acknowledgeMode.
# ```
# jms:Session session = check connection->createSession();
# ```
#
# + ackMode - Configuration indicating how messages received by the session will be acknowledged
# + return - Returns the Session or an error if it fails.
isolated remote function createSession(AcknowledgementMode ackMode = AUTO_ACKNOWLEDGE) returns jms:Session|jms:Error;

3. Session

A JMS session is a single-threaded context for producing and consuming messages. It's created from a JMS connection and provides the environment in which messages are sent and received. Sessions can be transactional or non-transactional. In a transactional session, multiple messages can be sent or received as part of a single transaction, and the transaction is committed or rolled back at the end. Non-transactional sessions handle messages one at a time without involving transactions.

3.1. Configuration

jms:Destination record corresponds to a JMS destination.

# Represent the JMS destination.
#
# + 'type - JMS destination types  
# + name - Name of the destination
public type Destination readonly & record {|
    DestinationType 'type;
    string name?;
|};

# Defines the supported JMS destination types.
public enum DestinationType {
    # Represents JMS Queue
    QUEUE = "QUEUE", 
    # Represents JMS Temporary Queue
    TEMPORARY_QUEUE = "TEMPORARY_QUEUE", 
    # Represents JMS Topic
    TOPIC = "TOPIC", 
    # Represents JMS Temporary Topic
    TEMPORARY_TOPIC = "TEMPORARY_TOPIC"
}

jms:ConsumerOptions record corresponds to the configurations related to a JMS message consumer.

# Defines the supported JMS message consumer types.
public enum ConsumerType {
    # Represents JMS durable subscriber
    DURABLE = "DURABLE", 
    # Represents JMS shared consumer
    SHARED = "SHARED", 
    # Represents JMS shared durable subscriber
    SHARED_DURABLE = "SHARED_DURABLE", 
    # Represents JMS default consumer
    DEFAULT = "DEFAULT"
}

# Message consumer listener configurations.
#
# + type - Message consumer type
# + destination - Name of the JMS destination
# + messageSelector - only messages with properties matching the message selector expression are added to the durable subscription. 
#                     An empty string indicates that there is no message selector for the durable subscription.
# + noLocal - if true then any messages published to the topic using this session's connection, or any other connection 
#             with the same client identifier, will not be added to the durable subscription.
# + subscriberName - the name used to identify the subscription 
public type ConsumerOptions record {|
    jms:ConsumerType 'type = DEFAULT;
    jms:Destination destination;
    string messageSelector = "";
    boolean noLocal = false;
    string subscriberName?;
|};

3.2. Functions

To unsubscribe a durable subscription that has been created by the JMS session, unsubscribe function can be used.

# Unsubscribe a durable subscription that has been created by this session.
# It is erroneous for a client to delete a durable subscription while there is an active (not closed) consumer
# for the subscription, or while a consumed message is part of a pending transaction or has not been
# acknowledged in the session.
# ```
# check session->unsubscribe("subscription-1");
# ```
#
# + subscriptionId - The name, which is used to identify the subscription
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function unsubscribe(string subscriptionId) returns jms:Error?;

To create a new jms:MessageProducer using the JMS session, createProducer function can be used.

# Creates a MessageProducer to send messages to the specified destination.
# ```
# jms:MessageProducer producer = check session.createProducer({
#   'type: jms:QUEUE,
#   name: "test-queue"
# });
# ```
#
# + destination - The Destination to send to, or nil if this is a producer which does not have a specified destination
# + return - Returns `jms:MessageProducer` or `jms:Error` if there is an error
public isolated function createProducer(jms:Destination? destination = ()) returns jms:MessageProducer|jms:Error;

To create a new jms:MessageConsumer using the JMS session, createConsumer function can be used.

# Creates a MessageConsumer for the specified destination.
# ```
# jms:MessageConsumer consumer = check session.createConsumer(destination = {
#   'type: jms:QUEUE,
#   name: "test-queue"
# });
# ```
#
# + consumerOptions - The relevant consumer configurations
# + return - Returns a `jms:MessageConsumer` or else `jms:Error` if there is an error
public isolated function createConsumer(*jms:ConsumerOptions consumerOptions) returns jms:MessageConsumer|jms:Error;

To commit all the messages sent/received in this transaction and release any locks currently held, 'commit function can be used.

# Commits all messages sent/received in this transaction and releases any locks currently held.
# ```
# check session->'commit();
# ```
# 
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function 'commit() returns jms:Error?;

To rollback all the messages sent/received in this transaction and release any locks currently held, 'rollback function can be used.

# Rolls back any messages sent/received in this transaction and releases any locks currently held.
# ```
# check session->'rollback();
# ```
# 
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function 'rollback() returns jms:Error?;

To close the current JMS session, close function can be used.

# Closes the current JMS session.
# ```ballerina
# check session->close();
# ```
# 
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function close() returns jms:Error?;

4. Message

A JMS message is a unit of data representing information that is exchanged between components using the Java Message Service (JMS) API. Messages can contain various types of data, such as text, binary data, maps, and objects. Ballerina java.jms library supports messages of type text, binary data, and maps.

# Represent the JMS Message used to send and receive content from the a JMS provider.
#
# + messageId - Unique identifier for a JMS message  
# + timestamp - Time a message was handed off to a provider to be sent 
# + correlationId - Id which can be used to correlate multiple messages 
# + replyTo - JMS destination to which a reply to this message should be sent
# + destination - JMS destination of this message 
# + deliveryMode - Delivery mode of this message  
# + redelivered - Indication of whether this message is being redelivered
# + jmsType - Message type identifier supplied by the client when the message was sent  
# + expiration - Message expiration time  
# + deliveredTime - The earliest time when a JMS provider may deliver the message to a consumer  
# + priority - Message priority level  
# + properties - Additional message properties
public type Message record {
    string messageId?;
    int timestamp?;
    string correlationId?;
    jms:Destination replyTo?;
    jms:Destination destination?;
    int deliveryMode?;
    boolean redelivered?;
    string jmsType?;
    int expiration?;
    int deliveredTime?;
    int priority?;
    map<anydata> properties?;
};

# Represent the JMS Text Message.
# 
# + content - Message content  
public type TextMessage record {|
    *jms:Message;
    string content;
|};

# Represent the JMS Map Message.
# 
# + content - Message content 
public type MapMessage record {|
    *jms:Message;
    map<anydata> content;
|};

# Represent the JMS Bytes Message.
# 
# + content - Message content 
public type BytesMessage record {|
    *jms:Message;
    byte[] content;
|};

5. Message producer

A JMS message producer is responsible for sending messages to a specific destination (queue or topic) within a JMS session.

5.1. Functions

To send a message to the pre-configured default destination of the JMS message producer, send function can be used.

# Sends a message to the JMS provider.
# ```
# check producer->send(message);
# ```
#
# + message - Message to be sent to the JMS provider
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function send(jms:Message message) returns jms:Error?;

To send a message to a given destination, sendTo function can be used.

# Sends a message to a given destination of the JMS provider.
# ```
# check producer->sendTo({ 'type: QUEUE, name: "test-queue" }, message);
# ```
#
# + destination - Destination used for the message sender
# + message - Message to be sent to the JMS provider
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function sendTo(jms:Destination destination, jms:Message message) returns jms:Error?;

To close the message producer, close function can be used.

# Closes the message producer.
# ``` 
# check producer->close();
# ```
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function close() returns jms:Error?;

6. Message consumer

A JMS message consumer is used to receive messages from a specific destination (queue or topic) within a JMS session. It's created from a JMS session and provides methods to receive messages synchronously.

6.1. Functions

To receive the next message arriving within the specified timeout interval, receive function can be used.

# Receives the next message that arrives within the specified timeout interval.
# ```
# jms:Message message = check consumer->receive(10000);
# ```
#
# + timeoutMillis - Message receive timeout
# + return - A `jms:JmsMessage` if there is a new message, `()` if there is no new message, 
# or else a `jsm:Error` if there is an error in the execution
isolated remote function receive(int timeoutMillis = 10000) returns jms:Message|jms:Error?;

To receive the next message if one is immediately available, receiveNoWait function can be used.

# Receives the next message if one is immediately available.
# ```
# jms:Message? message = check consumer->receiveNoWait();
# ```
# 
# + return - A `jms:JmsMessage` if there is a new message, `()` if there is no any new message, 
# or else a `jsm:Error` if there is an error in the execution
isolated remote function receiveNoWait() returns jms:Message|jms:Error?;

To mark a JMS message as received, the acknowledge function can be used.

# Mark a JMS message as received.
# ```
# check consumer->acknowledge(message);
# ```
#
# + message - JMS message record
# + return - A `jms:Error` if there is an error in the execution or else `()`
isolated remote function acknowledge(jms:Message message) returns jms:Error?;

To close the JMS consumer, close function can be used.

# Closes the message consumer.
# ```
# check consumer->close();
# ```
#
# + return - A `jms:Error` if there is an error or else `()`
isolated remote function close() returns jms:Error?;

7. Message listener

A JMS message listener is a mechanism for asynchronously receiving messages using event-driven programming. Instead of actively polling for messages, you can register a message listener. Upon the arrival of a message at the subscribed destination of the message listener, the callback method of the registered listener is triggered.

7.1. Configuration

When initializing a jms:Listener, the following configurations can be provided.

# Message listener configurations.
#
# + connectionConfig - Configurations related to the broker connection  
# + acknowledgementMode - Configuration indicating how messages received by the session will be acknowledged
# + consumerOptions - Underlying JMS message consumer configurations
public type MessageListenerConfigurations record {|
    ConnectionConfiguration connectionConfig;
    AcknowledgementMode acknowledgementMode = AUTO_ACKNOWLEDGE;
    ConsumerOptions consumerOptions;
|};

7.2. Initialization

The jms:Listener can be initialized by providing the jms:MessageListenerConfigurations.

# Creates a new `jms:Listener`.
# ```ballerina
# listener jms:Listener messageListener = check new(
#   connectionConfig = {
#       initialContextFactory: "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
#       providerUrl: "tcp://localhost:61616"
#   },
#   consumerOptions = {
#       destination: {
#           'type: jms:QUEUE,
#           name: "test-queue"
#       }
#   }
# );
# ```
# 
# + consumerConfig - Underlying JMS consumer configurations
# + return - The relevant JMS consumer or a `jms:Error` if there is any error
public isolated function init(*jms:MessageListenerConfigurations listenerConfig) returns jms:Error?;

7.3. Functions

To attach a service to the listener, attach function can be used.

# Attaches a message consumer service to a listener.
# ```
# check messageListener.attach(jmsService);
# ```
# 
# + 'service - The service instance
# + name - Name of the service
# + return - A `jms:Error` if there is an error or else `()`
public isolated function attach(jms:Service 'service, string[]|string? name = ()) returns jms:Error?;

To detach a service from the listener, detach function can be used.

# Detaches a message consumer service from the the listener.
# ```
# check messageListener.detach(jmsService);
# ```
#
# + 'service - The service to be detached
# + return - A `jms:Error` if there is an error or else `()`
public isolated function detach(jms:Service 'service) returns jms:Error?;

To start the listener, 'start function can be used.

# Starts the endpoint.
# ```
# check messageListener.'start();
# ```
#
# + return - A `jms:Error` if there is an error or else `()`
public isolated function 'start() returns jms:Error?;

To stop the listener gracefully, gracefulStop function can be used.

# Stops the JMS listener gracefully.
# ```
# check messageListener.gracefulStop();
# ```
#
# + return - A `jms:Error` if there is an error or else `()`
public isolated function gracefulStop() returns jms:Error?;

To stop the listener immediately, immediateStop function can be used.

# Stops the JMS listener immediately.
# ```
# check messageListener.immediateStop();
# ```
#
# + return - A `jms:Error` if there is an error or else `()`
public isolated function immediateStop() returns jms:Error?;

7.4. Caller

A jms:Caller can be used to mark JMS message as received. The caller is used in association with the jms:Listener.

7.4.1. Functions

To mark a JMS message as received, acknowledge function can be used.

# Mark a JMS message as received.
# ```
# check caller->acknowledge(message);
# ```
#
# + message - JMS message record
# + return - A `jms:Error` if there is an error in the execution or else `()`
isolated remote function acknowledge(jms:Message message) returns jms:Error?;

7.5. Usage

After initializing the jms:Listener a jms:Service must be attached to it.

service jms:Service "consumer-service" on messageListener {
    remote function onMessage(jms:Caller caller, jms:Message message) returns error? {
        // process results
    }
}