import ballerinax/kafka;
import ballerina/log;

// The `kafka:AuthenticationConfiguration` is used to provide authentication-related details.
kafka:AuthenticationConfiguration authConfig = {
    // Provide the authentication mechanism used by the Kafka server.
    mechanism: kafka:AUTH_SASL_PLAIN,
    // Username and password should be set here in order to authenticate the consumer.
    // For information on how to secure values instead of directly using plain text values, see [Writing Secure Ballerina Code](https://ballerina.io/learn/user-guide/security/writing-secure-ballerina-code/#securing-sensitive-data-using-configurable-variables).
    username: "ballerina",
    password: "ballerina-secret"
};

kafka:ConsumerConfiguration consumerConfig = {
    groupId: "test-group",
    clientId: "sasl-consumer",
    offsetReset: "earliest",
    topics: ["topic-sasl"],
    // Provide the relevant authentication configuration record to authenticate the consumer.
    auth: authConfig,
    securityProtocol: kafka:PROTOCOL_SASL_PLAINTEXT
};

// Provide the relevant SASL URL of the configured Kafka server.
const string SASL_URL = "localhost:9093";

listener kafka:Listener kafkaListener = new(SASL_URL, consumerConfig);

service kafka:Service on kafkaListener {
    remote function onConsumerRecord(kafka:Caller caller,
                                kafka:ConsumerRecord[] records) returns error? {
        // Loops through the received consumer records.
        foreach var consumerRecord in records {
            // Converts the `byte[]` to a `string`.
            string messageContent = check
                                        string:fromBytes(consumerRecord.value);

            log:printInfo(messageContent);
        }
    }
}

SASL Authentication - Consumer

This is an example of a Kafka consumer using the SASL/PLAIN authentication. For this example to work properly, an active Kafka server must be present and it should be configured to use the SASL/PLAIN authentication mechanism.

For more information on the underlying module, see the Kafka module.

import ballerinax/kafka;
import ballerina/log;
kafka:AuthenticationConfiguration authConfig = {

The kafka:AuthenticationConfiguration is used to provide authentication-related details.

    mechanism: kafka:AUTH_SASL_PLAIN,

Provide the authentication mechanism used by the Kafka server.

    username: "ballerina",
    password: "ballerina-secret"
};

Username and password should be set here in order to authenticate the consumer. For information on how to secure values instead of directly using plain text values, see Writing Secure Ballerina Code.

kafka:ConsumerConfiguration consumerConfig = {
    groupId: "test-group",
    clientId: "sasl-consumer",
    offsetReset: "earliest",
    topics: ["topic-sasl"],
    auth: authConfig,
    securityProtocol: kafka:PROTOCOL_SASL_PLAINTEXT
};

Provide the relevant authentication configuration record to authenticate the consumer.

const string SASL_URL = "localhost:9093";

Provide the relevant SASL URL of the configured Kafka server.

listener kafka:Listener kafkaListener = new(SASL_URL, consumerConfig);
service kafka:Service on kafkaListener {
    remote function onConsumerRecord(kafka:Caller caller,
                                kafka:ConsumerRecord[] records) returns error? {
        foreach var consumerRecord in records {

Loops through the received consumer records.

            string messageContent = check
                                        string:fromBytes(consumerRecord.value);

Converts the byte[] to a string.

            log:printInfo(messageContent);
        }
    }
}
bal run kafka_authentication_sasl_plain_consumer.bal
[ballerinax/kafka] kafka servers: localhost:9092
[ballerinax/kafka] subscribed topics: topic-sasl
[ballerinax/kafka] started kafka listener