import ballerinax/kafka;
import ballerina/io;

kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    // Subscribes to the topic `test-kafka-topic`.
    topics: ["test-kafka-topic"]

};

public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};

// Create a subtype of `kafka:AnydataConsumerRecord`
public type OrderConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    Order value;
|};

kafka:Consumer orderConsumer =
                check new (kafka:DEFAULT_URL, consumerConfiguration);

public function main() returns error? {
    // Polls the consumer for order records.
    OrderConsumerRecord[] records = check orderConsumer->poll(1);

    check from OrderConsumerRecord orderRecord in records
        where orderRecord.value.isValid
        do {
            io:println(orderRecord.value.productName);
        };
}

Consumer client - poll consumer record

This shows how to use a kafka:Consumer as a simple record consumer. The records from a subscribed topic can be retrieved using the poll() function. This consumer uses the builtin byte array deserializer for both the key and the value, which is the default deserializer in the kafka:Consumer. The received records are converted to the user defined type using data-binding. For this to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/io;
kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    topics: ["test-kafka-topic"]

Subscribes to the topic test-kafka-topic.

};
public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};
public type OrderConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    Order value;
|};

Create a subtype of kafka:AnydataConsumerRecord

kafka:Consumer orderConsumer =
                check new (kafka:DEFAULT_URL, consumerConfiguration);
public function main() returns error? {
    OrderConsumerRecord[] records = check orderConsumer->poll(1);

Polls the consumer for order records.

    check from OrderConsumerRecord orderRecord in records
        where orderRecord.value.isValid
        do {
            io:println(orderRecord.value.productName);
        };
}
bal run kafka_consumer_client_poll_consumer_record.bal