Back to EIP

PatternPublish-Subscribe Channel delivers a copy of a particular event to each receiver.
How Ballerina helps

Ballerina has a rich set of packages to interact with various messaging protocols. These include protocols that support publish-subscribe semantics such as Kafka, MQTT, and WebSocket.

Publish-Subscribe ChannelMessage ChannelMessage EndpointMessage Router
publisher.bal
Copy
import ballerina/http;
import ballerinax/kafka;

type MatchEvent record {|
    string matchId;
    string time;
    string event;
    string description;
|};

service /api on new http:Listener(8080) {

    private final kafka:Producer kafkaPublisher;

    function init() returns error? {
        self.kafkaPublisher = check new (kafka:DEFAULT_URL);
    }

    resource function post cricket/matches/[string matchId]/event(MatchEvent event) returns error? {
        check self.kafkaPublisher->send({
            topic: matchId,
            value: event
        });
    }
}
subscriber.bal
Copy
import ballerina/uuid;
import ballerina/websocket;
import ballerinax/kafka;

service /ws on new websocket:Listener(8081) {

    resource function get [string matchId]() returns websocket:Service|error {
        return new MatchUpdateService(matchId);
    }
}

isolated service class MatchUpdateService {
    *websocket:Service;
    private final kafka:Consumer kafkaConsumer;

    public function init(string matchId) returns error? {
        self.kafkaConsumer = check new (kafka:DEFAULT_URL, {
            groupId: string `realtime-web-ui-group-${uuid:createType1AsString()}`,
            topics: [matchId]
        });
    }

    isolated remote function onOpen(websocket:Caller caller) returns error? {
        while true {
            anydata[] matchDetails = check self.kafkaConsumer->pollPayload(1);
            from var matchDetail in matchDetails
            do {
                check caller->writeMessage(matchDetail);
            };
        }
    }
}