Back to Examples

Kafka consumer - SASL authentication

The kafka:Consumer connects to a Kafka server via SASL/PLAIN authentication and then, receives the payloads from the server. SASL/PLAIN authentication can be enabled by configuring the auth, which requires the authentication mechanism, username, and password. Further, the mode of security must be configured by setting the securityProtocol to kafka:PROTOCOL_SASL_PLAINTEXT. Use this to connect to a Kafka server secured with SASL/PLAIN.

import ballerinax/kafka;
import ballerina/io;

type Order readonly & record {
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
};

public function main() returns error? {
    kafka:Consumer orderConsumer = check new ("localhost:9093", {
        groupId: "order-group-id",
        topics: "order-topic",
        // Provide the relevant authentication configurations to authenticate the consumer
        // by the `kafka:AuthenticationConfiguration`.
        auth: {
            // Provide the authentication mechanism used by the Kafka server.
            mechanism: kafka:AUTH_SASL_PLAIN,
            // Username and password should be set here in order to authenticate the consumer.
            username: "alice",
            password: "alice@123"
        },
        securityProtocol: kafka:PROTOCOL_SASL_PLAINTEXT
    });

    // Polls the consumer for payload.
    Order[] orders = check orderConsumer->pollPayload(1);

    from Order 'order in orders
        where 'order.isValid
        do {
            io:println(string `Received valid order for ${'order.productName}`);
        };
}

Prerequisites

Run the program by executing the following command.

$ bal run kafka_client_consumer_sasl.balReceived valid order for Sport shoe

Tip: Run the Kafka client given in the Kafka producer - SASL authentication example to produce some messages to the topic.

Related links

PreviousSSL/TLS
NextSend/Receive