import ballerina/io;
import ballerinax/kafka;

kafka:ProducerConfiguration producerConfigs = {
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    // The `enableIdempotence` should set to `true` to make a producer transactional.
    enableIdempotence: true,

    // A `transactionalId` must be provided to make a producer transactional.
    transactionalId: "test-transactional-id"
};

kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfigs);

public function main() {
    string message = "Hello World Transaction Message";
    byte[] serializedMessage = message.toBytes();
    // Creates a producer config with optional parameters.
    // The `transactionalId` enables transactional message production.
    kafkaAdvancedTransactionalProduce(serializedMessage);
}

function kafkaAdvancedTransactionalProduce(byte[] message) {
    transaction {
        kafka:Error? sendResult = kafkaProducer->send({
            topic: "test-kafka-topic",
            value: message,
            partition: 0
        });
        // Checks for an error and notifies if an error has occurred.
        if sendResult is kafka:Error {
            io:println("Error occurred when sending message ", sendResult);
        }

        var commitResult = commit;
        if commitResult is () {
            io:println("Transaction successful");
        } else {
            io:println("Transaction unsuccessful " + commitResult.message());
        }
    }
}

Transactional Producer

This example shows you how to do transactional message producing by sending messages to kafka brokers atomically using the kafka:Producer object. For this example to work properly, an active Kafka broker should be present.

For more information on the underlying module, see the Kafka module.

import ballerina/io;
import ballerinax/kafka;
kafka:ProducerConfiguration producerConfigs = {
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    enableIdempotence: true,

The enableIdempotence should set to true to make a producer transactional.

    transactionalId: "test-transactional-id"
};

A transactionalId must be provided to make a producer transactional.

kafka:Producer kafkaProducer = check new (kafka:DEFAULT_URL, producerConfigs);
public function main() {
    string message = "Hello World Transaction Message";
    byte[] serializedMessage = message.toBytes();
    kafkaAdvancedTransactionalProduce(serializedMessage);
}

Creates a producer config with optional parameters. The transactionalId enables transactional message production.

function kafkaAdvancedTransactionalProduce(byte[] message) {
    transaction {
        kafka:Error? sendResult = kafkaProducer->send({
            topic: "test-kafka-topic",
            value: message,
            partition: 0
        });
        if sendResult is kafka:Error {
            io:println("Error occurred when sending message ", sendResult);
        }

Checks for an error and notifies if an error has occurred.

        var commitResult = commit;
        if commitResult is () {
            io:println("Transaction successful");
        } else {
            io:println("Transaction unsuccessful " + commitResult.message());
        }
    }
}
bal run kafka_message_producer_transactional.bal
[ballerina/http] started HTTP/WS listener 192.168.1.2:57457
Transaction successful