Module : rabbitmq

Module overview

This module contains functionality to support messaging with RabbitMQ broker (AMQP 0-9-1). RabbitMQ is one of the most popular open-source enterprise messaging systems, modeled on the Advanced Message Queuing Protocol (AMQP) standard.

Samples

RabbitMQ Producer

Following program will produce a message to a RabbitMQ server

import ballerina/io;
import ballerina/rabbitmq;

public function main() {
    // Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
    rabbitmq:Connection connection = new({ host: "localhost", port: 5672 });

    // Creates multiple ballerina RabbitMQ channels.
    rabbitmq:Channel newChannel1 = new(connection);
    rabbitmq:Channel newChannel2 = new(connection);

    // Declares the queue, MyQueue1.
    var queueResult1 = newChannel1->queueDeclare({ queueName: "MyQueue1" });
    if (queueResult1 is rabbitmq:Error) {
        io:println("An error occurred while creating the MyQueue1 queue.");
    }

    // Declares the queue, MyQueue2.
    var queueResult2 = newChannel2->queueDeclare({ queueName: "MyQueue2" });
    if (queueResult2 is rabbitmq:Error) {
        io:println("An error occurred while creating the MyQueue2 queue.");
    }

    // Publishing messages to an exchange using a routing key.
    // Publishes the message using newChannel1 and the routing key named MyQueue1.
    worker w1 {
        var sendResult = newChannel1->basicPublish("Hello from Ballerina", "MyQueue1");
        if (sendResult is rabbitmq:Error) {
            io:println("An error occurred while sending the message to
                     MyQueue1 using newChannel1.");
        }
    }

    // Publishing messages to the same routing key using a different channel.
    // Publishes the message using newChannel2 and the same routing key named MyQueue1.
    worker w2 {
        var sendResult = newChannel2->basicPublish("Hello from Ballerina", "MyQueue1");
        if (sendResult is rabbitmq:Error) {
            io:println("An error occurred while sending the message to
                    MyQueue1 using newChannel2.");
        }
    }

    // Publishing messages to different routing keys using the same channel.
    // Publishes the message using newChannel1 to a different routing key named MyQueue2.
    worker w3 {
        var sendResult = newChannel1->basicPublish("Hello from Ballerina", "MyQueue2");
        if (sendResult is rabbitmq:Error) {
            io:println("An error occurred while sending the message to
                    MyQueue2 using newChannel1.");
        }
    }
    _ = wait {w1, w2, w3};
}

RabbitMQ Subscriber

Following program will consume a message from a RabbitMQ server

import ballerina/log;
import ballerina/rabbitmq;

// Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
rabbitmq:Connection connection = new({ host: "localhost", port: 5672 });

listener rabbitmq:Listener channelListener= new(connection);

// The consumer service listens to the "MyQueue" queue.
// The `ackMode` is by default rabbitmq:AUTO_ACK where messages are acknowledged
// immediately after consuming.
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    }
}
// Attaches the service to the listener.
service rabbitmqConsumer on channelListener {

    // Gets triggered when a message is received by the queue.
    resource function onMessage(rabbitmq:Message message) {

        // Retrieves the text content of the message.
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }
    }
}

Note: The default thread pool size used in Ballerina is number of processors available * 2. You can configure the thread pool size by using the BALLERINA_MAX_POOL_SIZE environment variable.

Records

BasicProperties Holds other properties of the message - routing headers etc.
ConnectionConfiguration Holds the parameters used to create a RabbitMQ `Connection`.
Detail Record type to hold the details of an error.
ExchangeConfiguration Holds the parameters used to declare an exchange.
QueueConfiguration Holds the parameters used to declare a queue.
RabbitMQServiceConfig Represents the list of parameters required to create a subscription.
SecureSocket Provides configurations for facilitating secure connections.

Objects

Connection

Public Ballerina API - Interface to an AMQ Connection.

Clients

Channel

Public Ballerina API - Ballerina interface to an AMQP Channel. To provide AMQ Channel related functionalities.

Message

Public Ballerina API - Ballerina RabbitMQ Message.

Listeners

Listener

Public Ballerina API - Ballerina RabbitMQ Message Listener. To provide a listener to consume messages from RabbitMQ.

Constants

DIRECT_EXCHANGE

Constant for the RabbitMQ Direct Exchange type.

FANOUT_EXCHANGE

Constant for the RabbitMQ Fanout Exchange type.

TOPIC_EXCHANGE

Constant for the RabbitMQ Topic Exchange type.

AUTO_ACK

Constant for the RabbitMQ auto acknowledgement mode.

CLIENT_ACK

Constant for the RabbitMQ client acknowledgement mode.

RABBITMQ_ERROR

Annotations

ServiceConfig

Service descriptor data generated at compile time.

Types

AcknowledgementMode

Types of acknowledgement modes supported by the Ballerina RabbitMQ Connector.

MessageContent

Holds the types of message content that can be published.

Errors

Error