// The order management HTTP service acting as a Ballerina WebSub Publisher brings up an internal Ballerina WebSub Hub
// at which it will publish updates.
import ballerina/http;
import ballerina/log;
import ballerina/websub;

listener http:Listener httpListener = new (9090);

// The topic against which the publisher will publish updates and the subscribers
// need to subscribe to, to receive notifications when an order is placed.
final string ORDER_TOPIC = "http://localhost:9090/ordermgt/ordertopic";

// An in-memory `map` to which orders will be added.
map<json> orderMap = {};

// Invokes the function that starts up a Ballerina WebSub Hub, registers the topic
// against which updates will be published, and maintains a reference to the
// returned hub object to publish updates.
websub:Hub webSubHub = startHubAndRegisterTopic();

@http:ServiceConfig {
    basePath: "/ordermgt"
}
service orderMgt on httpListener {

    // This resource accepts the discovery requests.
    // Requests received at this resource would respond with a Link Header
    // indicating the topic to subscribe to and the hub(s) to subscribe at.
    @http:ResourceConfig {
        methods: ["GET", "HEAD"],
        path: "/order"
    }
    resource function discoverPlaceOrder(http:Caller caller, http:Request req) {
        http:Response response = new;
        // Adds a link header indicating the hub and topic.
        websub:addWebSubLinkHeader(response, [webSubHub.subscriptionUrl], ORDER_TOPIC);
        response.statusCode = 202;
        var result = caller->respond(response);
        if (result is error) {
            log:printError("Error responding on ordering", result);
        }
    }

    // This resource accepts order placement requests.
    @http:ResourceConfig {
        methods: ["POST"],
        path: "/order"
    }
    resource function placeOrder(http:Caller caller, http:Request req) {
        var orderReq = req.getJsonPayload();
        if (orderReq is json) {
            string orderId = orderReq.Order.ID.toString();
            orderMap[orderId] = <@untainted>orderReq;

            // Creates the response message indicating successful order creation.
            http:Response response = new;
            response.statusCode = 202;
            var result = caller->respond(response);
            if (result is error) {
                log:printError("Error responding on ordering", result);
            }

            // Publishes the update to the Hub to notify the subscribers.
            string orderCreatedNotification = "New Order Added: " + orderId;
            log:printInfo(orderCreatedNotification);
            var updateResult = webSubHub.publishUpdate(ORDER_TOPIC,
                                                    orderCreatedNotification);
            if (updateResult is error) {
                log:printError("Error publishing update", updateResult);
            }
        } else {
            error e = orderReq;
            log:printError("Error retrieving payload", e);
            panic e;
        }
    }

}

// Starts up a Ballerina WebSub Hub on port 9191 and registers the topic against
// which updates will be published.
function startHubAndRegisterTopic() returns websub:Hub {
    var hubStartUpResult = websub:startHub(new http:Listener(9191), "/websub", "/hub");

    websub:Hub? hubVar = ();
    if hubStartUpResult is websub:HubStartupError {
        panic hubStartUpResult;
    } else {
        hubVar = hubStartUpResult is websub:HubStartedUpError
                            ? hubStartUpResult.startedUpHub : hubStartUpResult;
    }

    websub:Hub internalHub = <websub:Hub>hubVar;
    var result = internalHub.registerTopic(ORDER_TOPIC);
    if (result is error) {
        log:printError("Error registering topic", result);
    }
    return internalHub;
}
// The Ballerina WebSub Subscriber service, which subscribes to notifications at the Hub.
import ballerina/log;
import ballerina/websub;

// The endpoint to which the subscriber service is bound.
listener websub:Listener websubEP = new (8181);

// Annotations specifying the subscription parameters for the order management service.
// A subscription request would be sent to the hub with the topic discovered at the
// resource URL specified.
@websub:SubscriberServiceConfig {
    path: "/ordereventsubscriber",
    subscribeOnStartUp: true,
    target: "http://localhost:9090/ordermgt/order",
    leaseSeconds: 3600,
    secret: "Kslk30SNF2AChs2"
}
service websubSubscriber on websubEP {
    // Defines the resource, which accepts the content delivery requests.
    resource function onNotification(websub:Notification notification) {
        var payload = notification.getTextPayload();
        if (payload is string) {
            log:printInfo("WebSub Notification Received: " + payload);
        } else {
            log:printError("Error retrieving payload as string", payload);
        }
    }
}

Service Integration Sample

Ballerina provides the capability to easily introduce webhooks via its implementation of the WebSub recommendation.

Ballerina Services, which act as WebSub Publishers could start up a Ballerina WebSub Hub to which they would publish updates against topics on the occurrence of particular events.

Ballerina WebSub subscribers can subscribe at these hubs, by specifying the topics, to receive notifications on the occurrence of particular events.

Any of the Ballerina’s WebSub components (Publisher, Hub, or Subscriber) could be used with non-Ballerina components, which are WebSub-compliant.

import ballerina/http;
import ballerina/log;
import ballerina/websub;

The order management HTTP service acting as a Ballerina WebSub Publisher brings up an internal Ballerina WebSub Hub at which it will publish updates.

listener http:Listener httpListener = new (9090);
final string ORDER_TOPIC = "http://localhost:9090/ordermgt/ordertopic";

The topic against which the publisher will publish updates and the subscribers need to subscribe to, to receive notifications when an order is placed.

map<json> orderMap = {};

An in-memory map to which orders will be added.

websub:Hub webSubHub = startHubAndRegisterTopic();

Invokes the function that starts up a Ballerina WebSub Hub, registers the topic against which updates will be published, and maintains a reference to the returned hub object to publish updates.

@http:ServiceConfig {
    basePath: "/ordermgt"
}
service orderMgt on httpListener {
    @http:ResourceConfig {
        methods: ["GET", "HEAD"],
        path: "/order"
    }
    resource function discoverPlaceOrder(http:Caller caller, http:Request req) {
        http:Response response = new;

This resource accepts the discovery requests. Requests received at this resource would respond with a Link Header indicating the topic to subscribe to and the hub(s) to subscribe at.

        websub:addWebSubLinkHeader(response, [webSubHub.subscriptionUrl], ORDER_TOPIC);
        response.statusCode = 202;
        var result = caller->respond(response);
        if (result is error) {
            log:printError("Error responding on ordering", result);
        }
    }

Adds a link header indicating the hub and topic.

    @http:ResourceConfig {
        methods: ["POST"],
        path: "/order"
    }
    resource function placeOrder(http:Caller caller, http:Request req) {
        var orderReq = req.getJsonPayload();
        if (orderReq is json) {
            string orderId = orderReq.Order.ID.toString();
            orderMap[orderId] = <@untainted>orderReq;

This resource accepts order placement requests.

            http:Response response = new;
            response.statusCode = 202;
            var result = caller->respond(response);
            if (result is error) {
                log:printError("Error responding on ordering", result);
            }

Creates the response message indicating successful order creation.

            string orderCreatedNotification = "New Order Added: " + orderId;
            log:printInfo(orderCreatedNotification);
            var updateResult = webSubHub.publishUpdate(ORDER_TOPIC,
                                                    orderCreatedNotification);
            if (updateResult is error) {
                log:printError("Error publishing update", updateResult);
            }
        } else {
            error e = orderReq;
            log:printError("Error retrieving payload", e);
            panic e;
        }
    }

Publishes the update to the Hub to notify the subscribers.

}
function startHubAndRegisterTopic() returns websub:Hub {
    var hubStartUpResult = websub:startHub(new http:Listener(9191), "/websub", "/hub");

Starts up a Ballerina WebSub Hub on port 9191 and registers the topic against which updates will be published.

    websub:Hub? hubVar = ();
    if hubStartUpResult is websub:HubStartupError {
        panic hubStartUpResult;
    } else {
        hubVar = hubStartUpResult is websub:HubStartedUpError
                            ? hubStartUpResult.startedUpHub : hubStartUpResult;
    }
    websub:Hub internalHub = <websub:Hub>hubVar;
    var result = internalHub.registerTopic(ORDER_TOPIC);
    if (result is error) {
        log:printError("Error registering topic", result);
    }
    return internalHub;
}
# This sample requires starting up the Subscriber Service after the Publisher Service.
# To start the service, navigate to the directory that contains the
# `.bal` file, and use the `ballerina run` command.
ballerina run order_mgmt_service.bal
[ballerina/websub] Ballerina WebSub Hub started up.
[ballerina/websub] Publish URL: http://localhost:9191/websub/publish
[ballerina/websub] Subscription URL: http://localhost:9191/websub/hub
[ballerina/http] started HTTP/WS listener 0.0.0.0:9191
[ballerina/http] started HTTP/WS listener 0.0.0.0:9090
2019-11-01 14:15:05,814 INFO  [ballerina/websub] - Subscription request received for topic[http://localhost:9090/ordermgt/ordertopic] with callback[http://localhost:8181/ordereventsubscriber]
2019-11-01 14:15:05,858 INFO  [ballerina/websub] - Sending intent verification request to callback[http://localhost:8181/ordereventsubscriber] for topic[http://localhost:9090/ordermgt/ordertopic]
2019-11-01 14:15:06,037 INFO  [ballerina/websub] - Intent verification successful for mode: [subscribe], for callback URL: [http://localhost:8181/ordereventsubscriber]
2019-11-01 14:15:58,756 INFO  [] - New Order Added: 1001
curl -X POST -d '{ "Order": { "ID": "1001", "Name": "XYZ" } }' "http://localhost:9090/ordermgt/order" -H "Content-Type:application/json"

Invoke the service via the below “curl” command to place an order after starting up the subscriber service.

import ballerina/log;
import ballerina/websub;

The Ballerina WebSub Subscriber service, which subscribes to notifications at the Hub.

listener websub:Listener websubEP = new (8181);

The endpoint to which the subscriber service is bound.

@websub:SubscriberServiceConfig {
    path: "/ordereventsubscriber",
    subscribeOnStartUp: true,
    target: "http://localhost:9090/ordermgt/order",
    leaseSeconds: 3600,
    secret: "Kslk30SNF2AChs2"
}
service websubSubscriber on websubEP {

Annotations specifying the subscription parameters for the order management service. A subscription request would be sent to the hub with the topic discovered at the resource URL specified.

    resource function onNotification(websub:Notification notification) {
        var payload = notification.getTextPayload();
        if (payload is string) {
            log:printInfo("WebSub Notification Received: " + payload);
        } else {
            log:printError("Error retrieving payload as string", payload);
        }
    }
}

Defines the resource, which accepts the content delivery requests.

# To start the service, navigate to the directory that contains the
# `.bal` file, and use the `ballerina run` command.
ballerina run subscriber.bal
[ballerina/http] started HTTP/WS listener 0.0.0.0:8181
2019-11-01 14:15:05,823 INFO  [ballerina/websub] - Subscription Request successfully sent to Hub[http://localhost:9191/websub/hub], for Topic[http://localhost:9090/ordermgt/ordertopic], with Callback [http://localhost:8181/ordereventsubscriber]
ballerina: Intent Verification agreed - Mode [subscribe], Topic [http://localhost:9090/ordermgt/ordertopic], Lease Seconds [3600]
2019-11-01 14:15:58,892 INFO  [] - WebSub Notification Received: New Order Added: 1001