Back to Examples

Kafka consumer - SSL/TLS

The kafka:Consumer connects to a Kafka server via SSL/TLS and then, receives payloads from the server. SSL/TLS can be enabled by configuring the secureSocket, which requires a certificate and the protocol name. Further, the mode of security must be configured by setting the securityProtocol to kafka:PROTOCOL_SSL. Use this to connect to a Kafka server secured with SSL/TLS.

import ballerinax/kafka;
import ballerina/io;

type Order readonly & record {
    int orderId;
    string productName;
    decimal price;
    boolean isValid;
};

public function main() returns error? {
    kafka:Consumer orderConsumer = check new ("localhost:9094", {
        groupId: "order-group-id",
        topics: ["order-topic"],
        // Provide the relevant secure socket configurations by using `kafka:SecureSocket`.
        secureSocket: {
            cert: "./resources/path/to/public.crt",
            protocol: {
                // Provide the relevant security protocol.
                name: kafka:SSL
            }
        },
        // Provide the type of the security protocol to use in the broker connection.
        securityProtocol: kafka:PROTOCOL_SSL
    });

    // Polls the consumer for payload.
    Order[] orders = check orderConsumer->pollPayload(1);

    from Order 'order in orders
        where 'order.isValid
        do {
            io:println(string `Received valid order for ${'order.productName}`);
        };
}

Prerequisites

Run the program by executing the following command.

$ bal run kafka_client_consumer_sasl.balReceived valid order for Sport shoe

Tip: Run the Kafka client given in the Kafka producer - SSL/TLS example to produce some messages to the topic.

Related links

PreviousSASL authentication
NextSASL authentication