import ballerinax/kafka;
import ballerina/io;

kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    // Subscribes to the topic `test-kafka-topic`.
    topics: ["test-kafka-topic"]

};

kafka:Consumer consumer = check new (kafka:DEFAULT_URL, consumerConfiguration);

public function main() returns error? {
    // Polls the consumer for messages.
    kafka:ConsumerRecord[] records = check consumer->poll(1);

    foreach var kafkaRecord in records {
        byte[] messageContent = kafkaRecord.value;
        // Converts the `byte[]` to a `string`.
        string message = check string:fromBytes(messageContent);

        // Prints the retrieved Kafka record.
        io:println("Received Message: " + message);
    }
}

Consumer Client

This is an example on how to use a kafka:Consumer as a simple record consumer. The records from a subscribed topic can be retrieved using the poll() function. This consumer uses the builtin byte array deserializer for both the key and the value, which is the default deserializer in the kafka:Consumer. For this example to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/io;
kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    topics: ["test-kafka-topic"]

Subscribes to the topic test-kafka-topic.

};
kafka:Consumer consumer = check new (kafka:DEFAULT_URL, consumerConfiguration);
public function main() returns error? {
    kafka:ConsumerRecord[] records = check consumer->poll(1);

Polls the consumer for messages.

    foreach var kafkaRecord in records {
        byte[] messageContent = kafkaRecord.value;
        string message = check string:fromBytes(messageContent);

Converts the byte[] to a string.

        io:println("Received Message: " + message);
    }
}

Prints the retrieved Kafka record.

bal run kafka_consumer_client.bal
[ballerinax/kafka] kafka servers: localhost:9092
[ballerinax/kafka] subscribed topics: test-kafka-topic