import ballerina/log;
import ballerina/rabbitmq;

// Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

listener rabbitmq:Listener channelListener = new (connection);

// The consumer service listens to the "MyQueue" queue.
// ackMode is by default rabbitmq:AUTO_ACK which will automatically acknowledge
// all messages once consumed.
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK
}
// Attaches the service to the listener.
service rabbitmqConsumerAck on channelListener {

    // Gets triggered when a message is received by the queue.
    resource function onMessage(rabbitmq:Message message) {

        // Retrieves the text content of the message.
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }

        // Positively acknowledges a single message.
        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }
}

Client Acknowledgements

In this example, the messages are consumed from an existing queue using the Ballerina RabbitMQ message listener. The received messages are acknowledged manually. By default, the ackMode is rabbitmq:AUTO_ACK, which will automatically acknowledge all messages once consumed.

import ballerina/log;
import ballerina/rabbitmq;
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

Creates a ballerina RabbitMQ connection that allows re-usability if necessary.

listener rabbitmq:Listener channelListener = new (connection);
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK
}

The consumer service listens to the “MyQueue” queue. ackMode is by default rabbitmq:AUTO_ACK which will automatically acknowledge all messages once consumed.

service rabbitmqConsumerAck on channelListener {

Attaches the service to the listener.

    resource function onMessage(rabbitmq:Message message) {

Gets triggered when a message is received by the queue.

        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }

Retrieves the text content of the message.

        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }
}

Positively acknowledges a single message.

# Make sure to have the RabbitMQ broker running.
#
# Navigate to the directory that contains the
# 'rabbitmq_consumer_with_client_acknowledgement.bal' file and issue the 'ballerina run' command as follows.
ballerina run rabbitmq_consumer_with_client_acknowledgement.bal
# The RabbitMQ consumer runs as a Ballerina service and listens to the subscribed queue.
2019-03-29 12:11:18,194 INFO [] - The message received: Hello from Ballerina