import ballerina/io;
import ballerina/kafka;

kafka:ProducerConfiguration producerConfiguration = {
    // The `bootstrapServers` is the list of remote server endpoints of the
    // Kafka brokers.
    bootstrapServers: "localhost:9092",
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    // Uses the builtin string serializer for the values.
    valueSerializerType: kafka:SER_STRING,
    // Uses the builtin int serializer for the keys.
    keySerializerType: kafka:SER_INT
};

kafka:Producer kafkaProducer = new (producerConfiguration);

public function main() {
    string message = "Hello World, Ballerina";
    var sendResult = kafkaProducer->send(message, "test-kafka-topic", key = 1);
    if (sendResult is error) {
        io:println("Error occurred while sending data: " + sendResult.toString());
    } else {
        io:println("Message sent successfully.");
    }
    var flushResult = kafkaProducer->flushRecords();
    if (flushResult is error) {
        io:println("Error occurred while flishing the data: " + flushResult.toString());
    } else {
        io:println("Records were flushed successfully.");
    }
}

Producer

This is an example on how to send messages to a Kafka topic using a kafka:Producer object. The producer is configured to send string values and int keys. For this example to work properly, an active Kafka broker should be present.

import ballerina/io;
import ballerina/kafka;
kafka:ProducerConfiguration producerConfiguration = {
    bootstrapServers: "localhost:9092",
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,

The bootstrapServers is the list of remote server endpoints of the Kafka brokers.

    valueSerializerType: kafka:SER_STRING,

Uses the builtin string serializer for the values.

    keySerializerType: kafka:SER_INT
};

Uses the builtin int serializer for the keys.

kafka:Producer kafkaProducer = new (producerConfiguration);
public function main() {
    string message = "Hello World, Ballerina";
    var sendResult = kafkaProducer->send(message, "test-kafka-topic", key = 1);
    if (sendResult is error) {
        io:println("Error occurred while sending data: " + sendResult.toString());
    } else {
        io:println("Message sent successfully.");
    }
    var flushResult = kafkaProducer->flushRecords();
    if (flushResult is error) {
        io:println("Error occurred while flishing the data: " + flushResult.toString());
    } else {
        io:println("Records were flushed successfully.");
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run kafka_message_producer_simple.bal
# If the producer successfully sent the message, the following will be printed
# in the Console.
Message sent successfully.
Records were flushed successfully.