This API documentation is for Ballerina 1.2.0. View API documentation for the latest release.

Module : kafka

Module Overview

This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports Kafka 1.x.x and 2.0.0 versions.

For information on the operations, which you can perform with this module, see the below Functions. For examples on the usage of the operations, see the following.

Basic Usages

Publishing Messages
  1. Initialize the Kafka message producer.
kafka:ProducerConfiguration producerConfiguration = {
    bootstrapServers: "localhost:9092",
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    valueSerializerType: kafka:SER_STRING,
    keySerializerType: kafka:SER_INT
};

kafka:Producer kafkaProducer = new (producerConfiguration);
  1. Use the kafka:Producer to publish messages.
string message = "Hello World, Ballerina";
kafka:ProducerError? result = kafkaProducer->send(message, "kafka-topic", key = 1);
Consuming Messages
  1. Initializing the Kafka message consumer.
kafka:ConsumerConfiguration consumerConfiguration = {
    bootstrapServers: "localhost:9092",
    groupId: "group-id",
    offsetReset: "earliest",
    topics: ["kafka-topic"]
};

kafka:Consumer consumer = new (consumerConfiguration);
  1. Use the kafka:Consumer as a simple record consumer.
kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
  1. Use the kafka:Consumer as a listener.
listener kafka:Consumer consumer = new (consumerConfiguration);

service kafkaService on consumer {
    // This resource will be executed when a message is published to the
    // subscribed topic/topics.
    resource function onMessage(kafka:Consumer kafkaConsumer,
            kafka:ConsumerRecord[] records) {
    }
}

Send Data Using Avro

The Ballerina Kafka module supports Avro serialization and deserialization.

To try this, create a new Ballerina project and two modules inside it.

Execute the below commands to do this.

ballerina new kafka_avro_sample
cd kafka_avro_sample
ballerina add producer
ballerina add consumer
Dependencies

To use Avro, you need to add the necessary dependencies to the Ballerina project you created. To do so, download the necessary dependencies and add them inside the resources directory. Also, add those dependencies to the Ballerina.toml file of your project. The following is a sample Ballerina.toml file with the dependencies.

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-avro-serializer-5.4.1.jar"
    artifactId = "kafka-avro-serializer"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-schema-registry-client-5.4.1.jar"
    artifactId = "kafka-schema-registry-client"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-config-5.4.1.jar"
    artifactId = "common-config"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-utils-5.4.1.jar"
    artifactId = "common-utils"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/avro-1.9.2.jar"
    artifactId = "avro"
    version = "1.9.2"
    groupId = "org.apache.avro"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-core-2.10.3.jar"
    artifactId = "jackson-core"
    version = "1.9.2"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-databind-2.10.3.jar"
    artifactId = "jackson-databind"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-annotations-2.10.3.jar"
    artifactId = "jackson-annotations"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

Now, the directory structure will look like below (some of the files are ignored).

├── Ballerina.toml
├── resources
│   ├── avro-1.9.2.jar
│   ├── common-config-5.4.1.jar
│   ├── common-utils-5.4.1.jar
│   ├── jackson-annotations-2.10.3.jar
│   ├── jackson-core-2.10.3.jar
│   ├── jackson-databind-2.10.3.jar
│   ├── kafka-avro-serializer-5.4.1.jar
│   └── kafka-schema-registry-client-5.4.1.jar
└── src
    ├── consumer
    │   └── main.bal
    └── producer
        └── main.bal
Avro Producer

The kafka:Proucer can be configured to send data using Avro by providing the following configurations.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

public type Person record {
    string name;
    int age;
};

kafka:ProducerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    // Other configurations
    valueSerializerType: kafka:SER_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

string schema = "{\"type\" : \"record\"," +
                  "\"namespace\" : \"Thisaru\"," +
                  "\"name\" : \"person\"," +
                  "\"fields\" : [" + 
                    "{ \"name\" : \"name\", \"type\" : \"string\" }," +
                    "{ \"name\" : \"age\", \"type\" : \"int\" }" +
                  "]}";

public function main() {
    kafka:Producer producer = new(configs);

    Person person = {
        name: "Lahiru Perera",
        age: 28
    };

    kafka:AvroRecord avroRecord = {
        schemaString: schema,
        dataRecord: person
    };

    kafka:ProducerError? result = producer->send(avroRecord, "add-person");
    if (result is kafka:ProducerError) {
        io:println(result);    
    }
}
Avro Consumer

The Kafka implementation of Ballerina currently supports Avro deserialization only for generic records. The Consumer will return a kafka:AvroGenericRecord with the data received from Avro.

The following is a sample consumer.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

kafka:ConsumerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    groupId: "test-group",
    // Other configurations
    topics: ["add-person"],
    valueDeserializerType: kafka:DES_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

listener kafka:Consumer consumer = new(configs);

service KafkaService on consumer {
    resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
        io:println("Records received");
        foreach var kafkaRecord in records {
            anydata value = kafkaRecord.value;
            if (value is kafka:AvroGenericRecord) {
                io:println(value);
            } else {
                io:println("Invalid record type");
            }
        }
    }
}

Now, execute the below command to run the consumer:

ballerina run consumer

This will start the Kafka service to listen. You can verify it by the following messages, which will be displayed on the screen.

[ballerina/kafka] kafka servers: <KAFKA_BROKER_HOST_AND_PORT>
[ballerina/kafka] subscribed topics: add-person
[ballerina/kafka] started kafka listener

Then, open another terminal and execute the below command to run the producer:

ballerina run producer

Now, the consumer will receive the data and the received data will be printed on the Console as follows.

Records received
name=Lahiru Perera age=28

Records

AuthenticationConfiguration

Configurations related to Kafka authentication mechanisms.

AvroGenericRecord

Represents a generic Avro record.

AvroRecord

Defines a record to send data using Avro serialization.

ConsumerConfiguration

Configurations related to consumer endpoint.

ConsumerRecord

Type related to consumer record.

Detail

Represents the details of an error.

KeyStore

Configurations related to the KeyStore.

PartitionOffset

Represents the topic partition position in which the consumed record is stored.

ProducerConfiguration

Represents the Kafka Producer configuration.

Protocols

Configurations related to the SSL/TLS protocol and the versions to be used.

SecureSocket

Configurations for facilitating secure communication with the Kafka server.

TopicPartition

Represents a topic partition.

TrustStore

Configurations related to the TrustStore.

Objects

Deserializer

Represents a Kafka deserializer object.

Serializer

Represents a Kafka serializer object.

Clients

Consumer

Represents a Kafka consumer endpoint.

Producer

Represents a Kafka producer endpoint.

Constants

DES_BYTE_ARRAY

In-built Kafka byte array deserializer.

DES_STRING

In-built Kafka string deserializer.

DES_INT

In-built Kafka int deserializer.

DES_FLOAT

In-built Kafka float deserializer.

DES_CUSTOM

User-defined deserializer.

DES_AVRO

Apache Avro deserializer.

ISOLATION_COMMITTED

Configures the consumer to read the committed messages only in the transactional mode when poll() is called.

ISOLATION_UNCOMMITTED

Configures the consumer to read all the messages including the aborted ones.

ACKS_ALL

Producer acknowledgement type is 'all'.

ACKS_NONE

Producer acknowledgement type '0'.

ACKS_SINGLE

Producer acknowledgement type '1'.

SER_BYTE_ARRAY

In-built Kafka Byte Array serializer.

SER_STRING

In-built Kafka string serializer.

SER_INT

In-built Kafka int serializer.

SER_FLOAT

In-built Kafka float serializer.

SER_CUSTOM

User-defined serializer.

SER_AVRO

Apache Avro serializer.

COMPRESSION_NONE

No compression.

COMPRESSION_GZIP

Kafka GZIP compression type.

COMPRESSION_SNAPPY

Kafka Snappy compression type.

COMPRESSION_LZ4

Kafka LZ4 compression type.

COMPRESSION_ZSTD

Kafka ZSTD compression type.

AUTH_SASL_PLAIN

Kafka SASL_PLAIN authentication mechanism

PROTOCOL_SASL_PLAINTEXT
PROTOCOL_SASL_SSL
CONSUMER_ERROR

Used as the error reason for the kafka:ConsumerError type.

PRODUCER_ERROR

Used as the error reason for the kafka:ProducerError type.

AVRO_ERROR

Used as the error reason for the kafka:AvroError type.

Types

AuthenticationMechanism

Represents the supported Kafka SASL authentication mechanisms.

CompressionType

Kafka compression types to compress the messages.

DeserializerType

Kafka in-built deserializer type.

IsolationLevel

Kafka consumer isolation level type.

ProducerAcks

Kafka producer acknowledgement types.

SecurityProtocol

Represents the supported security protocols for Kafka clients.

SerializerType

Kafka in-built serializer types.

Errors

AvroError

Represents a Kafka Avro related error.

ConsumerError

Error type specific to the kafka:Consumer object functions.

ProducerError

Error type specific to the kafka:Producer object functions.