import ballerinax/kafka;

// Creates a producer config with optional parameters.
// The `transactionalId` enables transactional message production.
kafka:ProducerConfiguration producerConfigs = {
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    // The `enableIdempotence` should set to `true` to make a producer transactional.
    enableIdempotence: true,

    // A `transactionalId` must be provided to make a producer transactional.
    transactionalId: "test-transactional-id"
};

kafka:Producer transactionalProducer =
                check new (kafka:DEFAULT_URL, producerConfigs);

public function main() returns error? {
    transaction {
        // Sends the message inside the transaction block.
        check transactionalProducer->send({
            topic: "test-kafka-topic",
            value: "Hello World Transaction Message"
        });

        check commit;
    }
}

Transactional producer

This shows how to do transactional message producing by sending messages to Kafka brokers atomically using the kafka:Producer client. For this to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
kafka:ProducerConfiguration producerConfigs = {
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,

Creates a producer config with optional parameters. The transactionalId enables transactional message production.

    enableIdempotence: true,

The enableIdempotence should set to true to make a producer transactional.

    transactionalId: "test-transactional-id"
};

A transactionalId must be provided to make a producer transactional.

kafka:Producer transactionalProducer =
                check new (kafka:DEFAULT_URL, producerConfigs);
public function main() returns error? {
    transaction {
        check transactionalProducer->send({
            topic: "test-kafka-topic",
            value: "Hello World Transaction Message"
        });

Sends the message inside the transaction block.

        check commit;
    }
}
bal run kafka_message_producer_transactional.bal
Transaction successful