import ballerinax/kafka;
import ballerina/log;

kafka:ConsumerConfiguration consumerConfigs = {
    // Uses two concurrent consumers to work as a group.
    concurrentConsumers: 2,

    groupId: "group-id",
    // Subscribes to the `test-kafka-topic`.
    topics: ["test-kafka-topic"],

    pollingInterval: 1
};

listener kafka:Listener kafkaListener =
            new (kafka:DEFAULT_URL, consumerConfigs);

service kafka:Service on kafkaListener {
    // This remote function executes when a message or a set of messages are published
    // to the subscribed topic/topics.
    remote function onConsumerRecord(kafka:Caller caller,
                        kafka:ConsumerRecord[] records) returns error? {
        // The set of Kafka records received by the service are processed one
        // by one.
        foreach var kafkaRecord in records {
            check processKafkaRecord(kafkaRecord);
        }

    }
}

function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns error? {
    byte[] messageContent = kafkaRecord.value;
    // Converts the `byte[]` to a `string`.
    string message = check string:fromBytes(messageContent);

    // Prints the retrieved message.
    log:printInfo("Received Message: " + message);
}

Consumer Groups

This is an example of a Kafka consumer used as a listener to listen to a given topic/topics. The listener uses a group of concurrent consumers within the service. This consumer uses the builtin string deserializer for the values. For this example to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/log;
kafka:ConsumerConfiguration consumerConfigs = {
    concurrentConsumers: 2,

Uses two concurrent consumers to work as a group.

    groupId: "group-id",
    topics: ["test-kafka-topic"],

Subscribes to the test-kafka-topic.

    pollingInterval: 1
};
listener kafka:Listener kafkaListener =
            new (kafka:DEFAULT_URL, consumerConfigs);
service kafka:Service on kafkaListener {
    remote function onConsumerRecord(kafka:Caller caller,
                        kafka:ConsumerRecord[] records) returns error? {

This remote function executes when a message or a set of messages are published to the subscribed topic/topics.

        foreach var kafkaRecord in records {
            check processKafkaRecord(kafkaRecord);
        }

The set of Kafka records received by the service are processed one by one.

    }
}
function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) returns error? {
    byte[] messageContent = kafkaRecord.value;
    string message = check string:fromBytes(messageContent);

Converts the byte[] to a string.

    log:printInfo("Received Message: " + message);
}

Prints the retrieved message.

bal run kafka_consumer_group_service.bal
[ballerinax/kafka] kafka servers: localhost:9092
[ballerinax/kafka] subscribed topics: test-kafka-topic
[ballerinax/kafka] started kafka listener