// The Ballerina WebSub Publisher brings up the internal Ballerina Hub,
// registers a topic at the hub, and publishes updates to the topic.
import ballerina/http;
import ballerina/io;
import ballerina/runtime;
import ballerina/websub;

public function main() {

    // Starts the internal Ballerina Hub.
    io:println("Starting up the Ballerina Hub Service");
    websub:Hub webSubHub;
    var result = websub:startHub(new http:Listener(9191), "/websub", "/hub");
    if (result is websub:Hub) {
        webSubHub = result;
    } else if (result is websub:HubStartedUpError) {
        webSubHub = result.startedUpHub;
    } else {
        io:println("Hub start error:" + <string>result.detail()?.message);
        return;
    }
    // Registers a topic at the hub.
    var registrationResponse = webSubHub.registerTopic(
                                            "http://websubpubtopic.com");
    if (registrationResponse is error) {
        io:println("Error occurred registering topic: " +
                                <string>registrationResponse.detail()?.message);
    } else {
        io:println("Topic registration successful!");
    }

    // Makes the publisher wait until the subscriber subscribes at the hub.
    runtime:sleep(5000);

    // Publishes directly to the internal Ballerina Hub.
    var publishResponse = webSubHub.publishUpdate("http://websubpubtopic.com",
                            {"action": "publish", "mode": "internal-hub"});
    if (publishResponse is error) {
        io:println("Error notifying hub: " +
                                    <string>publishResponse.detail()?.message);
    } else {
        io:println("Update notification successful!");
    }

    // Makes the publisher wait until the subscriber unsubscribes at the hub.
    runtime:sleep(5000);

    // Publishes directly to the internal Ballerina Hub.
    publishResponse = webSubHub.publishUpdate("http://websubpubtopic.com",
                            {"action": "publish", "mode": "internal-hub"});
    if (publishResponse is error) {
        io:println("Error notifying hub: " +
                                    <string>publishResponse.detail()?.message);
    } else {
        io:println("Update notification successful!");
    }

    // Makes the publisher wait until subscribers are notified.
    runtime:sleep(2000);
}
// The Ballerina WebSub Subscriber service, which represents the callback registered at the Hub.
import ballerina/log;
import ballerina/websub;

// The endpoint to which the subscriber service is bound.
listener websub:Listener websubEP = new (8181);

// Annotations specifying the subscription parameters.
// The omission of `subscribeOnStartUp` as an annotation due to which a subscription request would not be sent
// automatically on the start up.
// Also, the exclusion of the onIntentVerification resource will result in auto intent-verification.
@websub:SubscriberServiceConfig {
    path: "/websub",
    target: ["http://localhost:9191/websub/hub", "http://websubpubtopic.com"],
    secret: "Kslk30SNF2AChs2"
}
service websubSubscriber on websubEP {

    // This resource accepts content delivery requests.
    resource function onNotification(websub:Notification notification) {
        var payload = notification.getTextPayload();
        if (payload is string) {
            log:printInfo("WebSub Notification Received: " + payload);
        } else {
            log:printError("Error retrieving payload as string", payload);
        }
    }
}
// The Ballerina main program, which demonstrates the usage of the Hub client endpoint to subscribe/unsubscribe to notifications.
import ballerina/io;
import ballerina/runtime;
import ballerina/websub;

websub:SubscriptionClient websubHubClientEP =
                            new ("http://localhost:9191/websub/hub");

public function main() {

    // Sends the subscription request for the subscriber service.
    websub:SubscriptionChangeRequest subscriptionRequest = {
        topic: "http://websubpubtopic.com",
        callback: "http://localhost:8181/websub",
        secret: "Kslk30SNF2AChs2"
    };

    var response = websubHubClientEP->subscribe(subscriptionRequest);

    if (response is websub:SubscriptionChangeResponse) {
        io:println("Subscription Request successful at Hub [" + response.hub +
                    "] for Topic [" + response.topic + "]");
    } else {
        error err = response;
        string errCause = <string>err.detail()?.message;
        io:println("Error occurred with Subscription Request: " + errCause);
    }

    // Waits for the initial notification before unsubscribing.
    runtime:sleep(5000);

    // Sends the unsubscription request to the subscriber service.
    websub:SubscriptionChangeRequest unsubscriptionRequest = {
        topic: "http://websubpubtopic.com",
        callback: "http://localhost:8181/websub"
    };

    response = websubHubClientEP->unsubscribe(unsubscriptionRequest);

    if (response is websub:SubscriptionChangeResponse) {
        io:println("Unsubscription Request successful at Hub [" + response.hub +
                    "] for Topic [" + response.topic + "]");
    } else {
        error err = response;
        string errCause = <string>err.detail()?.message;
        io:println("Error occurred with Unsubscription Request: " + errCause);
    }
}

Hub Client Sample

Ballerina provides the capability to easily introduce publishers and subscribers that are WebSub-compliant. Ballerina WebSub subscribers can specify the topic they wish to subscribe to and the hub they wish to subscribe at, to receive notifications. If specified, the subscription process would be initiated at the startup. The subscription/unsubscription process could also be initiated via the Ballerina WebSub Hub Client Endpoint.

This example demonstrates the usage of the WebSub Hub Client Endpoint to subscribe and unsubscribe to updates for a particular topic at the hub. Also, it demonstrates auto intent verification in which intent verification for subscription or unsubscription requests would happen automatically (against the annotated topic) if the onIntentVerification request is not included.

The Hub Client Endpoint can be used by subscribers to subscribe/unsubscribe at a Hub, and by publishers to register topics and publish updates for topics at the Hub.

import ballerina/http;
import ballerina/io;
import ballerina/runtime;
import ballerina/websub;

The Ballerina WebSub Publisher brings up the internal Ballerina Hub, registers a topic at the hub, and publishes updates to the topic.

public function main() {
    io:println("Starting up the Ballerina Hub Service");
    websub:Hub webSubHub;
    var result = websub:startHub(new http:Listener(9191), "/websub", "/hub");
    if (result is websub:Hub) {
        webSubHub = result;
    } else if (result is websub:HubStartedUpError) {
        webSubHub = result.startedUpHub;
    } else {
        io:println("Hub start error:" + <string>result.detail()?.message);
        return;
    }

Starts the internal Ballerina Hub.

    var registrationResponse = webSubHub.registerTopic(
                                            "http://websubpubtopic.com");
    if (registrationResponse is error) {
        io:println("Error occurred registering topic: " +
                                <string>registrationResponse.detail()?.message);
    } else {
        io:println("Topic registration successful!");
    }

Registers a topic at the hub.

    runtime:sleep(5000);

Makes the publisher wait until the subscriber subscribes at the hub.

    var publishResponse = webSubHub.publishUpdate("http://websubpubtopic.com",
                            {"action": "publish", "mode": "internal-hub"});
    if (publishResponse is error) {
        io:println("Error notifying hub: " +
                                    <string>publishResponse.detail()?.message);
    } else {
        io:println("Update notification successful!");
    }

Publishes directly to the internal Ballerina Hub.

    runtime:sleep(5000);

Makes the publisher wait until the subscriber unsubscribes at the hub.

    publishResponse = webSubHub.publishUpdate("http://websubpubtopic.com",
                            {"action": "publish", "mode": "internal-hub"});
    if (publishResponse is error) {
        io:println("Error notifying hub: " +
                                    <string>publishResponse.detail()?.message);
    } else {
        io:println("Update notification successful!");
    }

Publishes directly to the internal Ballerina Hub.

    runtime:sleep(2000);
}

Makes the publisher wait until subscribers are notified.

# This sample requires running the subscriber client main program after the
# Publisher starts up the hub and registers the topic, and the subscriber
# service (representing the callback) is started.
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command.
ballerina run publisher.bal
Starting up the Ballerina Hub Service
[ballerina/websub] Ballerina WebSub Hub started up.
[ballerina/websub] Publish URL: http://localhost:9191/websub/publish
[ballerina/websub] Subscription URL: http://localhost:9191/websub/hub
[ballerina/http] started HTTP/WS listener 0.0.0.0:9191
Topic registration successful!
2019-11-01 14:06:38,035 INFO  [ballerina/websub] - Subscription request received for topic[http://websubpubtopic.com] with callback[http://localhost:8181/websub]
2019-11-01 14:06:38,255 INFO  [ballerina/websub] - Sending intent verification request to callback[http://localhost:8181/websub] for topic[http://websubpubtopic.com]
2019-11-01 14:06:38,781 INFO  [ballerina/websub] - Intent verification successful for mode: [subscribe], for callback URL: [http://localhost:8181/websub]
Update notification successful!
2019-11-01 14:06:40,572 INFO  [ballerina/websub] - Subscription request received for topic[http://websubpubtopic.com] with callback[http://localhost:8181/websub]
2019-11-01 14:06:40,576 INFO  [ballerina/websub] - Sending intent verification request to callback[http://localhost:8181/websub] for topic[http://websubpubtopic.com]
2019-11-01 14:06:40,627 INFO  [ballerina/websub] - Intent verification successful for mode: [subscribe], for callback URL: [http://localhost:8181/websub]
Update notification successful!
2019-11-01 14:06:45,628 INFO  [ballerina/websub] - Subscription request received for topic[http://websubpubtopic.com] with callback[http://localhost:8181/websub]
2019-11-01 14:06:45,637 INFO  [ballerina/websub] - Sending intent verification request to callback[http://localhost:8181/websub] for topic[http://websubpubtopic.com]
2019-11-01 14:06:45,695 INFO  [ballerina/websub] - Intent verification successful for mode: [unsubscribe], for callback URL: [http://localhost:8181/websub]
import ballerina/log;
import ballerina/websub;

The Ballerina WebSub Subscriber service, which represents the callback registered at the Hub.

listener websub:Listener websubEP = new (8181);

The endpoint to which the subscriber service is bound.

@websub:SubscriberServiceConfig {
    path: "/websub",
    target: ["http://localhost:9191/websub/hub", "http://websubpubtopic.com"],
    secret: "Kslk30SNF2AChs2"
}
service websubSubscriber on websubEP {

Annotations specifying the subscription parameters. The omission of subscribeOnStartUp as an annotation due to which a subscription request would not be sent automatically on the start up. Also, the exclusion of the onIntentVerification resource will result in auto intent-verification.

    resource function onNotification(websub:Notification notification) {
        var payload = notification.getTextPayload();
        if (payload is string) {
            log:printInfo("WebSub Notification Received: " + payload);
        } else {
            log:printError("Error retrieving payload as string", payload);
        }
    }
}

This resource accepts content delivery requests.

# The update published after unsubscription is not received by the subscriber service.
# To start the service, navigate to the directory that contains the
# `.bal` file, and use the `ballerina run` command.
ballerina run subscriber_service.bal
[ballerina/http] started HTTP/WS listener 0.0.0.0:8181
2019-11-01 14:06:38,171 INFO  [ballerina/websub] - Subscription Request successfully sent to Hub[http://localhost:9191/websub/hub], for Topic[http://websubpubtopic.com], with Callback [http://localhost:8181/websub]
ballerina: Intent Verification agreed - Mode [subscribe], Topic [http://websubpubtopic.com], Lease Seconds [86400]
2019-11-01 14:06:40,196 INFO  [] - WebSub Notification Received: {"action":"publish", "mode":"internal-hub"}
ballerina: Intent Verification agreed - Mode [subscribe], Topic [http://websubpubtopic.com], Lease Seconds [86400]
2019-11-01 14:06:45,052 INFO  [] - WebSub Notification Received: {"action":"publish", "mode":"internal-hub"}
ballerina: Intent Verification agreed - Mode [unsubscribe], Topic [http://websubpubtopic.com]
import ballerina/io;
import ballerina/runtime;
import ballerina/websub;

The Ballerina main program, which demonstrates the usage of the Hub client endpoint to subscribe/unsubscribe to notifications.

websub:SubscriptionClient websubHubClientEP =
                            new ("http://localhost:9191/websub/hub");
public function main() {
    websub:SubscriptionChangeRequest subscriptionRequest = {
        topic: "http://websubpubtopic.com",
        callback: "http://localhost:8181/websub",
        secret: "Kslk30SNF2AChs2"
    };

Sends the subscription request for the subscriber service.

    var response = websubHubClientEP->subscribe(subscriptionRequest);
    if (response is websub:SubscriptionChangeResponse) {
        io:println("Subscription Request successful at Hub [" + response.hub +
                    "] for Topic [" + response.topic + "]");
    } else {
        error err = response;
        string errCause = <string>err.detail()?.message;
        io:println("Error occurred with Subscription Request: " + errCause);
    }
    runtime:sleep(5000);

Waits for the initial notification before unsubscribing.

    websub:SubscriptionChangeRequest unsubscriptionRequest = {
        topic: "http://websubpubtopic.com",
        callback: "http://localhost:8181/websub"
    };

Sends the unsubscription request to the subscriber service.

    response = websubHubClientEP->unsubscribe(unsubscriptionRequest);
    if (response is websub:SubscriptionChangeResponse) {
        io:println("Unsubscription Request successful at Hub [" + response.hub +
                    "] for Topic [" + response.topic + "]");
    } else {
        error err = response;
        string errCause = <string>err.detail()?.message;
        io:println("Error occurred with Unsubscription Request: " + errCause);
    }
}
# To run this sample, navigate to the directory that contains the
# `.bal` file, and execute the `ballerina run` command below.
ballerina run subscription_change_client.bal
Subscription Request successful at Hub [http://localhost:9191/websub/hub] for Topic [http://websubpubtopic.com]
Unsubscription Request successful at Hub [http://localhost:9191/websub/hub] for Topic [http://websubpubtopic.com]