import ballerina/log;
import ballerina/rabbitmq;

// Creates a ballerina RabbitMQ connection that allows reusability if necessary.
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

listener rabbitmq:Listener channelListener = new (connection);

// The consumer service listens to the "MyQueue" queue.
// Quality of service settings(prefetchCount and prefetchSize) can be
// set at the listener initialization globally or per consumer service.
// These settings impose limits on the amount of data the server
// will deliver to consumers before requiring acknowledgements.
// Thus they provide a means of consumer-initiated flow control.
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK,
    prefetchCount: 10
}
// Attaches the service to the listener.
service QosConsumer on channelListener {

    // Gets triggered when a message is received by the queue.
    resource function onMessage(rabbitmq:Message message) {
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }
        // The consumer will continue to receive messages from the server
        // once a total of 10(prefetchCount) messages are being acknowledged.
        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }
}

QoS Settings

In this example, the messages are consumed from an existing queue using the Ballerina RabbitMQ message listener, requesting specific “quality of service” settings. These settings (i.e., prefetchCount and prefetchSize) impose limits on the amount of data the server will deliver to consumers before requiring acknowledgements. Thus, they provide a means of consumer-initiated flow control. prefetchSize is the maximum amount of content (measured in octets) that the server will deliver. If the prefetchSize is given as 0, it will be considered unlimited. prefetchCount is the maximum number of messages that the server will deliver. If the prefetchCount is given as 0, it will be considered unlimited. If the settings are specified at the listener initialization, they will be applied for the entire connection as a global setting. To apply the settings per consumer service, they should be specified in the service config annotation.

import ballerina/log;
import ballerina/rabbitmq;
rabbitmq:Connection connection = new ({host: "localhost", port: 5672});

Creates a ballerina RabbitMQ connection that allows reusability if necessary.

listener rabbitmq:Listener channelListener = new (connection);
@rabbitmq:ServiceConfig {
    queueConfig: {
        queueName: "MyQueue"
    },
    ackMode: rabbitmq:CLIENT_ACK,
    prefetchCount: 10
}

The consumer service listens to the “MyQueue” queue. Quality of service settings(prefetchCount and prefetchSize) can be set at the listener initialization globally or per consumer service. These settings impose limits on the amount of data the server will deliver to consumers before requiring acknowledgements. Thus they provide a means of consumer-initiated flow control.

service QosConsumer on channelListener {

Attaches the service to the listener.

    resource function onMessage(rabbitmq:Message message) {
        var messageContent = message.getTextContent();
        if (messageContent is string) {
            log:printInfo("The message received: " + messageContent);
        } else {
            log:printError("Error occurred while retrieving the message content.");
        }

Gets triggered when a message is received by the queue.

        var result = message->basicAck();
        if (result is error) {
            log:printError("Error occurred while acknowledging the message.");
        }
    }
}

The consumer will continue to receive messages from the server once a total of 10(prefetchCount) messages are being acknowledged.

# Make sure to have the RabbitMQ broker running.
#
# Navigate to the directory that contains the
# 'rabbitmq_consumer_with_qos_settings.bal' file and issue the 'ballerina run' command as follows..
ballerina run rabbitmq_consumer_with_qos_settings.bal
# The RabbitMQ consumer runs as a Ballerina service and listens to the subscribed queue.
2019-03-29 12:11:18,194 INFO [] - The message received: Hello from Ballerina