import ballerina/log;
import ballerinax/rabbitmq;

// The consumer service listens to the "MyQueue" queue.
@rabbitmq:ServiceConfig {
    queueName: "MyQueue",
    autoAck: false
}
// Attaches the service to the listener.
service /transactionConsumer on
    new rabbitmq:Listener(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT) {

    // Gets triggered when a message is received by the queue.
    remote function onMessage(rabbitmq:Message message,
                                rabbitmq:Caller caller) {

        string|error messageContent = 'string:fromBytes(message.content);
        if messageContent is string {
            log:printInfo("The message received: " + messageContent);
        }

        // Acknowledges a single message positively.
        // The acknowledgement gets committed upon successful execution of the transaction,
        // or will rollback otherwise.
        transaction {
            rabbitmq:Error? result = caller->basicAck();
            if (result is error) {
                log:printError(
                            "Error occurred while acknowledging the message.");
            }
            error? res = commit;
        }
    }
}

SASL Authentication - Consumer

In this example, the messages are consumed from an existing queue using the Ballerina RabbitMQ message listener and Ballerina transactions. Upon successful execution of the transaction block, the acknowledgement will commit or rollback in the case of any error. Messages will not be re-queued in the case of a rollback automatically unless negatively acknowledged by the user. For more information on the underlying module, see the RabbitMQ module.

import ballerina/log;
import ballerinax/rabbitmq;
@rabbitmq:ServiceConfig {
    queueName: "MyQueue",
    autoAck: false
}

The consumer service listens to the “MyQueue” queue.

service /transactionConsumer on
    new rabbitmq:Listener(rabbitmq:DEFAULT_HOST, rabbitmq:DEFAULT_PORT) {

Attaches the service to the listener.

    remote function onMessage(rabbitmq:Message message,
                                rabbitmq:Caller caller) {

Gets triggered when a message is received by the queue.

        string|error messageContent = 'string:fromBytes(message.content);
        if messageContent is string {
            log:printInfo("The message received: " + messageContent);
        }
        transaction {
            rabbitmq:Error? result = caller->basicAck();
            if (result is error) {
                log:printError(
                            "Error occurred while acknowledging the message.");
            }
            error? res = commit;
        }
    }
}

Acknowledges a single message positively. The acknowledgement gets committed upon successful execution of the transaction, or will rollback otherwise.

bal run rabbitmq_transaction_consumer.bal
[ballerina/http] started HTTP/WS listener 192.168.1.2:51642
[ballerinax/rabbitmq] Consumer service started for queue MyQueue
time = 2021-01-18 15:15:36,514 level = INFO  module = "" message = "The message received: Hello from Ballerina"