import ballerina/log;
import ballerina/rabbitmq;

// Creates a ballerina RabbitMQ connection that allows re-usability if necessary.
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

listener rabbitmq:Listener channelListener = new (connection);

@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK
}
// Attaches the service to the listener.
service dataBindingConsumer on channelListener {

    // Gets triggered when a message is received by the queue.
    resource function onMessage(rabbitmq:Message message, string stringMessage) {
        // Retrieves the text content of the message.
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }

        // Message content can be accessed using the second parameter
        // of the resource function.
        log:printInfo("The message received from data binding: " + stringMessage);

        // Acknowledges a single message positively.
        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }

    // Gets triggered when an error is encountered.
    resource function onError(rabbitmq:Message message, error err) {
        log:printError("Error from connector: " + err.reason() + " - "
                                                  + <string>err.detail()?.message);
    }
}

Data Binding

In this example, the messages are consumed from an existing queue using the Ballerina RabbitMQ message listener. The messages can be accessed using the rabbitmq:Message object and also the second parameter of the resource function. string, json, xml, byte[], record, float and int are supported as parameter types.

import ballerina/log;
import ballerina/rabbitmq;
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

Creates a ballerina RabbitMQ connection that allows re-usability if necessary.

listener rabbitmq:Listener channelListener = new (connection);
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK
}
service dataBindingConsumer on channelListener {

Attaches the service to the listener.

    resource function onMessage(rabbitmq:Message message, string stringMessage) {

Gets triggered when a message is received by the queue.

        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }

Retrieves the text content of the message.

        log:printInfo("The message received from data binding: " + stringMessage);

Message content can be accessed using the second parameter of the resource function.

        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }

Acknowledges a single message positively.

    resource function onError(rabbitmq:Message message, error err) {
        log:printError("Error from connector: " + err.reason() + " - "
                                                  + <string>err.detail()?.message);
    }
}

Gets triggered when an error is encountered.

# Make sure to have the RabbitMQ broker running.
#
# Navigate to the directory that contains the
# 'rabbitmq_consumer_with_data_binding.bal' file, and issue the 'ballerina run' command as follows.
ballerina run rabbitmq_consumer_with_data_binding.bal
# The RabbitMQ consumer runs as a Ballerina service and listens to the subscribed queue.
2019-06-13 13:15:35,611 INFO  [] - The message received: Hello from Ballerina
2019-06-13 13:15:35,612 INFO  [] - The message received from data binding: Hello from Ballerina