import ballerinax/kafka;
import ballerina/io;

kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    // Subscribes to the topic `test-kafka-topic`.
    topics: ["test-kafka-topic"]

};

public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};

kafka:Consumer orderConsumer =
            check new (kafka:DEFAULT_URL, consumerConfiguration);

public function main() returns error? {
    // Polls the consumer for payload.
    Order[] orders = check orderConsumer->pollPayload(1);

    check from Order 'order in orders
        where 'order.isValid
        do {
            io:println('order.productName);
        };
}

Consumer client - poll payload

This shows how to use a kafka:Consumer as a simple payload consumer for the instances where the metadata related to the message is not needed. This consumer uses the builtin byte array deserializer for the value and converts the value to the user defined type. For this to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/io;
kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "group-id",
    offsetReset: "earliest",
    topics: ["test-kafka-topic"]

Subscribes to the topic test-kafka-topic.

};
public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};
kafka:Consumer orderConsumer =
            check new (kafka:DEFAULT_URL, consumerConfiguration);
public function main() returns error? {
    Order[] orders = check orderConsumer->pollPayload(1);

Polls the consumer for payload.

    check from Order 'order in orders
        where 'order.isValid
        do {
            io:println('order.productName);
        };
}
bal run kafka_consumer_client_poll_payload.bal