import ballerinax/kafka;
import ballerina/log;

kafka:ConsumerConfiguration consumerConfigs = {
    groupId: "group-id",
    // Subscribes to the topic `test-kafka-topic`.
    topics: ["test-kafka-topic"],

    pollingInterval: 1
};

public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};

// Create a subtype of `kafka:AnydataConsumerRecord`
public type OrderConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    Order value;
|};

service on new kafka:Listener(kafka:DEFAULT_URL, consumerConfigs) {
    remote function onConsumerRecord(OrderConsumerRecord[] records)
                                                        returns error? {
        // The set of Kafka records received by the service are processed one by one.
        check from OrderConsumerRecord orderRecord in records
            where orderRecord.value.isValid
            do {
                log:printInfo("Received Valid Order: " +
                                    orderRecord.value.toString());
            };
    }
}

Consumer service

Here, a Kafka consumer is used as a listener to a service with manual offset commits. For this to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/log;
kafka:ConsumerConfiguration consumerConfigs = {
    groupId: "group-id",
    topics: ["test-kafka-topic"],

Subscribes to the topic test-kafka-topic.

    pollingInterval: 1
};
public type Order record {|
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
|};
public type OrderConsumerRecord record {|
    *kafka:AnydataConsumerRecord;
    Order value;
|};

Create a subtype of kafka:AnydataConsumerRecord

service on new kafka:Listener(kafka:DEFAULT_URL, consumerConfigs) {
    remote function onConsumerRecord(OrderConsumerRecord[] records)
                                                        returns error? {
        check from OrderConsumerRecord orderRecord in records
            where orderRecord.value.isValid
            do {
                log:printInfo("Received Valid Order: " +
                                    orderRecord.value.toString());
            };
    }
}

The set of Kafka records received by the service are processed one by one.

bal run kafka_consumer_service.bal