Module : kafka
Module Overview
This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports Kafka 1.x.x and 2.0.0 versions.
For information on the operations, which you can perform with this module, see the below Functions. For examples on the usage of the operations, see the following.
- Producer Example
- Consumer Service Example
- Consumer Client Example
- Transactional Producer Example
- SASL Consumer Authentication Example
- SASL Producer Authentication Example
Basic Usages
Publishing Messages
- Initialize the Kafka message producer.
kafka:ProducerConfiguration producerConfiguration = {
bootstrapServers: "localhost:9092",
clientId: "basic-producer",
acks: "all",
retryCount: 3,
valueSerializerType: kafka:SER_STRING,
keySerializerType: kafka:SER_INT
};
kafka:Producer kafkaProducer = new (producerConfiguration);
- Use the
kafka:Producer
to publish messages.
string message = "Hello World, Ballerina";
kafka:ProducerError? result = kafkaProducer->send(message, "kafka-topic", key = 1);
Consuming Messages
- Initializing the Kafka message consumer.
kafka:ConsumerConfiguration consumerConfiguration = {
bootstrapServers: "localhost:9092",
groupId: "group-id",
offsetReset: "earliest",
topics: ["kafka-topic"]
};
kafka:Consumer consumer = new (consumerConfiguration);
- Use the
kafka:Consumer
as a simple record consumer.
kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
- Use the
kafka:Consumer
as a listener.
listener kafka:Consumer consumer = new (consumerConfiguration);
service kafkaService on consumer {
// This resource will be executed when a message is published to the
// subscribed topic/topics.
resource function onMessage(kafka:Consumer kafkaConsumer,
kafka:ConsumerRecord[] records) {
}
}
Send Data Using Avro
The Ballerina Kafka module supports Avro serialization and deserialization.
To try this, create a new Ballerina project and two modules inside it.
Execute the below commands to do this.
ballerina new kafka_avro_sample
cd kafka_avro_sample
ballerina add producer
ballerina add consumer
Dependencies
To use Avro, you need to add the necessary dependencies to the Ballerina project you created.
To do so, download the necessary dependencies and add them inside the resources
directory. Also, add those dependencies to the Ballerina.toml
file of your project.
The following is a sample Ballerina.toml
file with the dependencies.
[[platform.libraries]]
module = "producer"
path = "./resources/kafka-avro-serializer-5.4.1.jar"
artifactId = "kafka-avro-serializer"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/kafka-schema-registry-client-5.4.1.jar"
artifactId = "kafka-schema-registry-client"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/common-config-5.4.1.jar"
artifactId = "common-config"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/common-utils-5.4.1.jar"
artifactId = "common-utils"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/avro-1.9.2.jar"
artifactId = "avro"
version = "1.9.2"
groupId = "org.apache.avro"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-core-2.10.3.jar"
artifactId = "jackson-core"
version = "1.9.2"
groupId = "com.fasterxml.jackson.core"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-databind-2.10.3.jar"
artifactId = "jackson-databind"
version = "2.10.3"
groupId = "com.fasterxml.jackson.core"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-annotations-2.10.3.jar"
artifactId = "jackson-annotations"
version = "2.10.3"
groupId = "com.fasterxml.jackson.core"
Now, the directory structure will look like below (some of the files are ignored).
├── Ballerina.toml
├── resources
│ ├── avro-1.9.2.jar
│ ├── common-config-5.4.1.jar
│ ├── common-utils-5.4.1.jar
│ ├── jackson-annotations-2.10.3.jar
│ ├── jackson-core-2.10.3.jar
│ ├── jackson-databind-2.10.3.jar
│ ├── kafka-avro-serializer-5.4.1.jar
│ └── kafka-schema-registry-client-5.4.1.jar
└── src
├── consumer
│ └── main.bal
└── producer
└── main.bal
Avro Producer
The kafka:Proucer
can be configured to send data using Avro by providing the following configurations.
schemaString
: This is the schema string, which is used to define the Avro schema.dataRecord
: The data record you want to send through Kafka.
src/producer/main.bal:
import ballerina/io;
import ballerina/kafka;
public type Person record {
string name;
int age;
};
kafka:ProducerConfiguration configs = {
bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
// Other configurations
valueSerializerType: kafka:SER_AVRO,
schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};
string schema = "{\"type\" : \"record\"," +
"\"namespace\" : \"Thisaru\"," +
"\"name\" : \"person\"," +
"\"fields\" : [" +
"{ \"name\" : \"name\", \"type\" : \"string\" }," +
"{ \"name\" : \"age\", \"type\" : \"int\" }" +
"]}";
public function main() {
kafka:Producer producer = new(configs);
Person person = {
name: "Lahiru Perera",
age: 28
};
kafka:AvroRecord avroRecord = {
schemaString: schema,
dataRecord: person
};
kafka:ProducerError? result = producer->send(avroRecord, "add-person");
if (result is kafka:ProducerError) {
io:println(result);
}
}
Avro Consumer
The Kafka implementation of Ballerina currently supports Avro deserialization only for generic records.
The Consumer will return a kafka:AvroGenericRecord
with the data received from Avro.
The following is a sample consumer.
src/producer/main.bal:
import ballerina/io;
import ballerina/kafka;
kafka:ConsumerConfiguration configs = {
bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
groupId: "test-group",
// Other configurations
topics: ["add-person"],
valueDeserializerType: kafka:DES_AVRO,
schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};
listener kafka:Consumer consumer = new(configs);
service KafkaService on consumer {
resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
io:println("Records received");
foreach var kafkaRecord in records {
anydata value = kafkaRecord.value;
if (value is kafka:AvroGenericRecord) {
io:println(value);
} else {
io:println("Invalid record type");
}
}
}
}
Now, execute the below command to run the consumer:
ballerina run consumer
This will start the Kafka service to listen. You can verify it by the following messages, which will be displayed on the screen.
[ballerina/kafka] kafka servers: <KAFKA_BROKER_HOST_AND_PORT>
[ballerina/kafka] subscribed topics: add-person
[ballerina/kafka] started kafka listener
Then, open another terminal and execute the below command to run the producer:
ballerina run producer
Now, the consumer will receive the data and the received data will be printed on the Console as follows.
Records received
name=Lahiru Perera age=28
AuthenticationConfiguration |
Configurations related to Kafka authentication mechanisms. |
AvroGenericRecord |
Represents a generic Avro record. |
AvroRecord |
Defines a record to send data using Avro serialization. |
ConsumerConfiguration |
Configurations related to consumer endpoint. |
ConsumerRecord |
Type related to consumer record. |
Detail |
Represents the details of an error. |
KeyStore |
Configurations related to the KeyStore. |
PartitionOffset |
Represents the topic partition position in which the consumed record is stored. |
ProducerConfiguration |
Represents the Kafka Producer configuration. |
Protocols |
Configurations related to the SSL/TLS protocol and the versions to be used. |
SecureSocket |
Configurations for facilitating secure communication with the Kafka server. |
TopicPartition |
Represents a topic partition. |
TrustStore |
Configurations related to the TrustStore. |
Deserializer |
Represents a Kafka deserializer object. |
Serializer |
Represents a Kafka serializer object. |
Consumer |
Represents a Kafka consumer endpoint. |
Producer |
Represents a Kafka producer endpoint. |
DES_BYTE_ARRAY |
In-built Kafka byte array deserializer. |
DES_STRING |
In-built Kafka string deserializer. |
DES_INT |
In-built Kafka int deserializer. |
DES_FLOAT |
In-built Kafka float deserializer. |
DES_CUSTOM |
User-defined deserializer. |
DES_AVRO |
Apache Avro deserializer. |
ISOLATION_COMMITTED |
Configures the consumer to read the committed messages only in the transactional mode when poll() is called. |
ISOLATION_UNCOMMITTED |
Configures the consumer to read all the messages including the aborted ones. |
ACKS_ALL |
Producer acknowledgement type is 'all'. |
ACKS_NONE |
Producer acknowledgement type '0'. |
ACKS_SINGLE |
Producer acknowledgement type '1'. |
SER_BYTE_ARRAY |
In-built Kafka Byte Array serializer. |
SER_STRING |
In-built Kafka string serializer. |
SER_INT |
In-built Kafka int serializer. |
SER_FLOAT |
In-built Kafka float serializer. |
SER_CUSTOM |
User-defined serializer. |
SER_AVRO |
Apache Avro serializer. |
COMPRESSION_NONE |
No compression. |
COMPRESSION_GZIP |
Kafka GZIP compression type. |
COMPRESSION_SNAPPY |
Kafka Snappy compression type. |
COMPRESSION_LZ4 |
Kafka LZ4 compression type. |
COMPRESSION_ZSTD |
Kafka ZSTD compression type. |
AUTH_SASL_PLAIN |
Kafka SASL_PLAIN authentication mechanism |
PROTOCOL_SASL_PLAINTEXT | |
PROTOCOL_SASL_SSL | |
CONSUMER_ERROR |
Used as the error reason for the |
PRODUCER_ERROR |
Used as the error reason for the |
AVRO_ERROR |
Used as the error reason for the |
AuthenticationMechanism |
Represents the supported Kafka SASL authentication mechanisms. |
CompressionType |
Kafka compression types to compress the messages. |
DeserializerType |
Kafka in-built deserializer type. |
IsolationLevel |
Kafka consumer isolation level type. |
ProducerAcks |
Kafka producer acknowledgement types. |
SecurityProtocol |
Represents the supported security protocols for Kafka clients. |
SerializerType |
Kafka in-built serializer types. |
AvroError |
Represents a Kafka Avro related error. |
ConsumerError |
Error type specific to the |
ProducerError |
Error type specific to the |