This API documentation is for Ballerina 1.2.0. View API documentation for the latest release.

Module : rabbitmq

Module Overview

RabbitMQ is one of the most popular open-source enterprise messaging systems modelled on the Advanced Message Queuing Protocol (AMQP) standard. This guide covers the Ballerina RabbitMQ module and its public API. This module contains the functionality to support messaging with the RabbitMQ broker (AMQP 0-9-1). It assumes that the most recent version of Ballerina is used and is built on top of the basics.

Key sections include:

Connecting to RabbitMQ

The core APIs are the rabbitmq:Connection, rabbitmq:Channel, and rabbitmq:Message representing an AMQP 0-9-1 Connection, a Channel, and a Ballerina RabbitMQ Message respectively. The following code connects to a RabbitMQ node using the given parameters (host name, port number, etc.):

   rabbitmq:Connection newConnection = new({ host: "localhost", 
                                            port: 5672, 
                                            username: “guest”, 
                                            password: “guest” });

The rabbitmq:Connection created above can then be used to open a rabbitmq:Channel.

   rabbitmq:Channel newChannel = new(connection);

The rabbitmq:Channel can now be used to send and receive messages as described in the subsequent sections.

Enabling TLS

It is possible to encrypt the communication between the Ballerina client and the broker by using TLS-enabled connections. Client and server authentication (peer verification) is also supported. To enable the TLS support in the RabbitMQ broker, the node has to be configured to know the location of the Certificate Authority bundle, the server's certificate file, and the server's key. A TLS listener should also be configured to know which port to listen for TLS-enabled client connections.

Connecting to a TLS-enabled RabbitMQ node using the Ballerina client can be done by passing a rabbitmq:SecureSocket record with the appropriate values to the rabbitmq:ConnectionConfiguration record when initializing the connection.

   rabbitmq:Connection connection = new({ host: "localhost", 
                                    	    port: 5671, 
                                            secureSocket: { trustStore: { path: "/path/to/trustStore",
                                                                        password: "rabbitstore" },
                                                          keystore: { path: "/path/to/client_key.p12",
                                                                      password: "MySecretPassword" },
                                                          verifyHostname: true }});

Disconnecting from RabbitMQ

To disconnect, simply close the open channels and the connections:

   rabbitmq:Error? closeResult = newChannel.close();
   rabbitmq:Error? closeResult = newConnection.close();

Note: Closing the Channel may be a good practice. However, it isn’t strictly necessary in this case as it will be done automatically when the underlying Connection is closed.

Using exchanges and queues

Client applications work with exchanges and queues, which are the high-level building blocks of the AMQP protocol. These must be declared before they can be used. The following code declares an exchange and a server-named queue and then binds them together.

   rabbitmq:Error? exchangeResult = newChannel->exchangeDeclare({ exchangeName: "MyExchange",
                                                      exchangeType: rabbitmq:DIRECT_EXCHANGE,
                                                      durable: true,
                                                      autoDelete: true });
   
   string|rabbitmq:Error? queueResult = newChannel->queueDeclare();
   if (queueResult is string) {
        rabbitmq:Error? bindResult = newChannel.queueBind(queueResult, "MyExchange", "routing-key");
   }

This sample code will declare,

Next, the above function is called to bind the queue to the exchange with the given routing key.

   rabbitmq:Error? exchangeResult = newChannel->exchangeDeclare({ exchangeName: "MyExchange",
                                                      exchangeType: rabbitmq:DIRECT_EXCHANGE,
                                                      durable: true,
                                                      autoDelete: true });
   
   string|rabbitmq:Error? queueResult = newChannel->queueDeclare({ queueName: "MyQueue", 
                                                durable: true,
                                                exclusive: false,
                                                autoDelete: false });

   rabbitmq:Error? bindResult = newChannel.queueBind("MyQueue", "MyExchange", "routing-key");

This sample code will declare,

Deleting entities and purging queues

   rabbitmq:Error? deleteResult = newChannel->queueDelete("MyQueue");
   rabbitmq:Error? deleteResult = newChannel->queueDelete("MyQueue", false, true);
   rabbitmq:Error? deleteResult = newChannel->queueDelete("MyQueue", true, false);
   rabbitmq:Error? deleteResult = newChannel->exchangeDelete("MyExchange");
   rabbitmq:Error? purgeResult = newChannel->queuePurge("MyQueue");

Publishing messages

To publish a message to an exchange, use the basicPublish() function as follows:

   rabbitmq:Error? sendResult = newChannel->basicPublish("Hello from Ballerina", "MyQueue");

Setting other properties of the message such as routing headers can be done by using the BasicProperties record with the appropriate values.

Consuming messages using consumer services

The most efficient way to receive messages is to set up a subscription using a Ballerina RabbitMQ rabbitmq:Listener and any number of consumer services. The messages will then be delivered automatically as they arrive rather than having to be explicitly requested.

Multiple consumer services can be bound to one Ballerina RabbitMQ rabbitmq:Listener. The queue to which the service is listening is configured in the rabbitmq:ServiceConfig annotation of the service.

listener rabbitmq:Listener channelListener= new(newConnection);

@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    }
}
service rabbitmqConsumer on channelListener {
    resource function onMessage(rabbitmq:Message message) {
        string|rabbitmq:Error? messageContent = message.getTextContent();
    }
}

The rabbitmq:Message object received can be used to retrieve its contents and for manual client acknowledgements.

Client acknowledgements

The message consuming is supported by mainly two types of acknowledgement modes, which are auto acknowledgements and client acknowledgements. Client acknowledgements can further be divided into to two different types as positive and negative acknowledgements. The default acknowledgement mode is auto-ack (messages are acknowledged immediately after consuming).

WARNING: To ensure the reliability of receiving messages, use the client-ack mode.

The negatively-acknowledged (rejected) messages can be re-queued.

Note: The default thread pool size used in Ballerina is the number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

For information on the operations, which you can perform with this module, see the below Functions.

For examples on the usage of the connector, see the following.

Records

BasicProperties

Basic properties of the message - routing headers etc.

ConnectionConfiguration

Configurations used to create a rabbitmq:Connection.

Detail

The details of an error.

ExchangeConfiguration

Configurations used to declare an exchange.

QueueConfiguration

Configurations used to declare a queue.

RabbitMQServiceConfig

Configurations required to create a subscription.

SecureSocket

Configurations for facilitating secure connections.

Objects

Connection

Represents a single network connection to the RabbitMQ broker.

Clients

Channel

The Ballerina interface to provide AMQP Channel related functionality.

Message

Provides the functionality to manipulate the messages received by the consumer services.

Listeners

Listener

Ballerina RabbitMQ Message Listener.

Constants

DIRECT_EXCHANGE

Constant for the RabbitMQ Direct Exchange type.

FANOUT_EXCHANGE

Constant for the RabbitMQ Fan-out Exchange type.

TOPIC_EXCHANGE

Constant for the RabbitMQ Topic Exchange type.

AUTO_ACK

Constant for the RabbitMQ auto acknowledgement mode.

CLIENT_ACK

Constant for the RabbitMQ client acknowledgement mode.

RABBITMQ_ERROR

Represents the reason for the RabbitMQ module related errors.

Annotations

ServiceConfig

The annotation, which is used to configure the subscription.

Types

AcknowledgementMode

Types of acknowledgement modes supported by the Ballerina RabbitMQ Connector.

MessageContent

Types of message content that can be published.

Errors

Error

Represents the RabbitMQ module related errors.