import ballerina/io;
import ballerina/kafka;

// The `kafka:AuthenticationConfiguration` is used to provide authentication-related details.
kafka:AuthenticationConfiguration authConfig = {
    // Provide the authentication mechanism used by the Kafka server.
    mechanism: kafka:AUTH_SASL_PLAIN,
    // Username and password should be set here in order to authenticate the producer.
    // Check Ballerina `config` APIs to see how to use encrypted values instead of plain text values here.
    username: "ballerina",
    password: "ballerina-secret"
};

kafka:ProducerConfiguration producerConfigs = {
    bootstrapServers: "localhost:9092",
    valueSerializerType: kafka:SER_STRING,
    // Provide the relevant authentication configuration record to authenticate the producer.
    authenticationConfiguration: authConfig
};

kafka:Producer kafkaProducer = new (producerConfigs);

public function main() {
    var result = kafkaProducer->send("Hello from Ballerina", "topic-sasl");
    if (result is error) {
        io:println(result);
    } else {
        io:println("success");
    }
}

SASL Authentication - Producer

Here, is an example of a Kafka producer using SASL/PLAIN authentication. For this example to work properly, an active Kafka server must be present and it should be configured to use the SASL/PLAIN authentication mechanism.

import ballerina/io;
import ballerina/kafka;
kafka:AuthenticationConfiguration authConfig = {

The kafka:AuthenticationConfiguration is used to provide authentication-related details.

    mechanism: kafka:AUTH_SASL_PLAIN,

Provide the authentication mechanism used by the Kafka server.

    username: "ballerina",
    password: "ballerina-secret"
};

Username and password should be set here in order to authenticate the producer. Check Ballerina config APIs to see how to use encrypted values instead of plain text values here.

kafka:ProducerConfiguration producerConfigs = {
    bootstrapServers: "localhost:9092",
    valueSerializerType: kafka:SER_STRING,
    authenticationConfiguration: authConfig
};

Provide the relevant authentication configuration record to authenticate the producer.

kafka:Producer kafkaProducer = new (producerConfigs);
public function main() {
    var result = kafkaProducer->send("Hello from Ballerina", "topic-sasl");
    if (result is error) {
        io:println(result);
    } else {
        io:println("success");
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run kafka_authentication_sasl_plain_producer.bal
# If the producer is authenticated and the message sent successfully,
# following message will be printed in the console.
Message successfully sent.