Ballerina for Event-Driven Architecture (EDA)
Event-driven architecture provides loose coupling, scalability, responsiveness, extensibility, and fault tolerance. It enables real-time processing, integration, and autonomy, making it ideal for complex systems.
Remarkably, Ballerina can produce and consume events like no other.
Passing around events has never been easier
Producing and consuming events from message brokers and then exposing them to others using WebHook, WebSocket, GraphQL, gRPC, etc., is just a piece of cake using Ballerina. You focus on the business logic and let Ballerina focus on the network logic.
public type Order readonly & record {
int orderId;
string productName;
decimal price;
boolean isValid;
};
listener kafka:Listener orderListener = new (kafka:DEFAULT_URL, {
groupId: "order-group-id",
topics: "order-topic"
});
service on orderListener {
remote function onConsumerRecord(Order[] orders) returns error? {
// The set of orders received by the service are processed one by one.
check from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
};
}
}
WebHooks with WebSub
Ballerina combines HTTP and event-driven development leveraging the power of the HTTP ecosystem with WebSub for constructing event-driven applications effortlessly.
@websub:SubscriberServiceConfig {
target: [
"https://api.github.com/hub",
"https://github.com/<YOUR_ORGANIZATION>/<REPOSITORY>/events/push.json"
],
secret: "<YOUR_SECRET_KEY>",
httpConfig: {
auth: {
token: "<YOUR_AUTH_TOKEN>"
}
}
}
service on new websub:Listener(9090) {
remote function onEventNotification(websub:ContentDistributionMessage event) returns error? {
// Get the expected json payload
json retrievedContent = check event.content.ensureType();
// Handle the new event
if retrievedContent.zen is string {
int hookId = check retrievedContent.hook_id;
io:println(string `PingEvent received for webhook [${hookId}]`);
} else if retrievedContent.ref is string {
string repositoryName = check retrievedContent.repository.name;
io:println(string `PushEvent received for [${repositoryName}]`);
}
}
}
service /hub on new websubhub:Listener(9000) {
remote function onRegisterTopic(readonly & websubhub:TopicRegistration message)
returns websubhub:TopicRegistrationSuccess|websubhub:TopicRegistrationError {
// Add your logic here
return websubhub:TOPIC_REGISTRATION_SUCCESS;
}
remote function onDeregisterTopic(readonly & websubhub:TopicDeregistration message)
returns websubhub:TopicDeregistrationSuccess|websubhub:TopicDeregistrationError {
// Add your logic here
return websubhub:TOPIC_DEREGISTRATION_SUCCESS;
}
remote function onUpdateMessage(readonly & websubhub:UpdateMessage message)
returns websubhub:Acknowledgement|websubhub:UpdateMessageError {
// Add your logic here
return websubhub:ACKNOWLEDGEMENT;
}
remote function onSubscriptionValidation(readonly & websubhub:Subscription message)
returns websubhub:SubscriptionDeniedError? {
// Add your logic here
}
remote function onSubscriptionIntentVerified(readonly & websubhub:VerifiedSubscription message)
returns error? {
// Add your logic here
}
remote function onUnsubscriptionValidation(readonly & websubhub:Unsubscription message)
returns websubhub:UnsubscriptionDeniedError? {
// Add your logic here
}
remote function onUnsubscriptionIntentVerified(
readonly & websubhub:VerifiedUnsubscription message) returns error? {
// Add your logic here
}
}
WebSocket, TCP socket for the web
WebSocket is an excellent choice for message transmission particularly on the web. With Ballerina, you have not only the capability to utilize a WebSocket as it is but also the flexibility to develop custom subprotocols that align with your organization's specific requirements.
// This service is for drivers to register and send locations.
service /taxi on taxiMgtListener {
resource isolated function get [string name]()
returns websocket:Service|websocket:UpgradeError {
return new DriverService(name);
}
}
isolated service class DriverService {
*websocket:Service;
final string driverName;
public isolated function init(string username) {
self.driverName = username;
}
remote isolated function onOpen(websocket:Caller caller) returns websocket:Error? {
string welcomeMsg =
"Hi " + self.driverName + "! Your location will be shared with the riders";
check caller->writeMessage(welcomeMsg);
broadcast("Driver " + self.driverName + " ready for a ride");
}
// 'onMessage' remote function will receive the location updates from drivers.
remote function onMessage(websocket:Caller caller, string location) returns websocket:Error? {
worker broadcast returns error? {
// Broadcast the live locations to registered riders.
string locationUpdateMsg = self.driverName + " updated the location " + location;
broadcast(locationUpdateMsg);
}
}
remote isolated function onClose(websocket:Caller caller, int statusCode, string reason) {
io:println(self.driverName + " got disconnected");
}
}
GraphQL subscription
Ballerina simplifies GraphQL subscriptions, allowing easy event streaming to frontend applications, enabling effortless event subscriptions and delivery to popular clients.
service /news on new graphql:Listener(9090) {
remote function publish(NewsUpdate & readonly update) returns NewsRecord|error {
lock {
if publisherTable.hasKey(update.publisherId) {
return produceNews(update).cloneReadOnly();
}
}
return error("Invalid publisher");
}
resource function subscribe news(string userId, Agency agency) returns stream<News>|error {
stream<News> newsStream;
lock {
if userTable.hasKey(userId) {
NewsStream newsStreamGenerator = check new (userId, agency);
newsStream = new (newsStreamGenerator);
} else {
return error("User not registered");
}
}
return newsStream;
}
}
RPC, why not gRPC?
Ballerina simplifies gRPC event streaming by bridging the gap between the service definition and code generation. You can effortlessly leverage gRPC's power for seamless data streaming with Ballerina.
Code
public type Rectangle record {|
Point lo = {};
Point hi = {};
|};
public type Feature record {|
string name = "";
Point location = {};
|};
public type Point record {|
int latitude = 0;
int longitude = 0;
|};
@grpc:ServiceDescriptor {descriptor: ROOT_DESCRIPTOR, descMap: getDescriptorMap()}
service "RouteGuide" on new grpc:Listener(9000) {
remote function ListFeatures(Rectangle rectangle) returns stream<Feature, grpc:Error?>|error {
int left = int:min(rectangle.lo.longitude, rectangle.hi.longitude);
int right = int:max(rectangle.lo.longitude, rectangle.hi.longitude);
int top = int:max(rectangle.lo.latitude, rectangle.hi.latitude);
int bottom = int:min(rectangle.lo.latitude, rectangle.hi.latitude);
Feature[] selectedFeatures = from var feature in FEATURES
where feature.name != ""
where feature.location.longitude >= left
where feature.location.longitude <= right
where feature.location.latitude >= bottom
where feature.location.latitude <= top
select feature;
return selectedFeatures.toStream();
}
}
Proto
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message Feature {
string name = 1;
Point location = 2;
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
service RouteGuide {
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
public type Rectangle record {|
Point lo = {};
Point hi = {};
|};
public type Feature record {|
string name = "";
Point location = {};
|};
public type Point record {|
int latitude = 0;
int longitude = 0;
|};
@grpc:ServiceDescriptor {descriptor: ROOT_DESCRIPTOR, descMap: getDescriptorMap()}
service "RouteGuide" on new grpc:Listener(9000) {
remote function ListFeatures(Rectangle rectangle) returns stream<Feature, grpc:Error?>|error {
int left = int:min(rectangle.lo.longitude, rectangle.hi.longitude);
int right = int:max(rectangle.lo.longitude, rectangle.hi.longitude);
int top = int:max(rectangle.lo.latitude, rectangle.hi.latitude);
int bottom = int:min(rectangle.lo.latitude, rectangle.hi.latitude);
Feature[] selectedFeatures = from var feature in FEATURES
where feature.name != ""
where feature.location.longitude >= left
where feature.location.longitude <= right
where feature.location.latitude >= bottom
where feature.location.latitude <= top
select feature;
return selectedFeatures.toStream();
}
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message Feature {
string name = 1;
Point location = 2;
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
service RouteGuide {
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
Files, files as events?
Ballerina enhances file transfer with FTP listeners, reacting to events on servers. SFTP and FTPS options ensure security. Seamlessly integrate file transfer with Ballerina's secure functionality.
configurable string username = ?;
configurable string password = ?;
listener ftp:Listener fileListener = check new ({
host: "ftp.example.com",
auth: {
credentials: {
username,
password
}
},
path: "/home/in",
fileNamePattern: "(.*).txt"
});
service on fileListener {
remote function onFileChange(ftp:WatchEvent & readonly event, ftp:Caller caller)
returns error? {
foreach ftp:FileInfo addedFile in event.addedFiles {
stream<io:Block, io:Error?> fileStream = check
io:fileReadBlocksAsStream("./local/appendFile.txt", 7);
check caller->append(addedFile.path, fileStream);
check fileStream.close();
}
}
}
Automated responding to emails
Ballerina simplifies real-time email handling by offering abstractions and connectors for seamless integration. It supports SMTP, IMAP, and POP3, and even enables integration with ChatGPT for advanced email automation workflows, adding sophistication to your communication processes.
configurable string username = ?;
configurable string password = ?;
listener email:ImapListener emailListener = new ({
host: "imap.email.com",
username,
password
});
service "observer" on emailListener {
// When an email is successfully received, the `onMessage` method is called.
remote function onMessage(email:Message email) {
log:printInfo("Received an email", subject = email.subject, content = email?.body);
}
// When an error occurs during the email poll operations, the `onError` method is called.
remote function onError(email:Error emailError) {
log:printError(emailError.message(), stackTrace = emailError.stackTrace());
}
// When the listener is closed, the `onClose` method is called.
remote function onClose(email:Error? closeError) {
if closeError is email:Error {
log:printInfo(closeError.message(), stackTrace = closeError.stackTrace());
}
}
}