Module : websub
Module Overview
This module contains an implementation of the W3C WebSub recommendation, which facilitates a push-based content delivery/notification mechanism between publishers and subscribers.
This implementation supports introducing all WebSub components:
- Subscriber - A party interested in receiving update notifications for particular topics.
- Publisher - A party that advertises topics to which interested parties subscribe in order to receive notifications on occurrence of events.
- Hub - A party that accepts subscription requests from subscribers and delivers content to the subscribers when the topic is updated by the topic's publisher.
Basic flow with WebSub
-
The subscriber discovers from the publisher, the topic it needs to subscribe to and the hub(s) that deliver notifications on updates of the topic.
-
The subscriber sends a subscription request to one or more discovered hub(s) specifying the discovered topic along with other subscription parameters such as:
- The callback URL to which content is expected to be delivered.
- (Optional) The lease period (in seconds) the subscriber wants the subscription to stay active.
- (Optional) A secret to use for authenticated content distribution.
-
The hub sends an intent verification request to the specified callback URL. If the response indicates verification (by echoing a challenge specified in the request) by the subscriber, the subscription is added for the topic at the hub.
-
The publisher notifies the hub of updates to the topic and the content to deliver is identified.
-
The hub delivers the identified content to the subscribers of the topic.
Features
Subscriber
This module allows introducing a WebSub Subscriber Service with onIntentVerification
, which accepts HTTP GET requests for intent verification, and onNotification
, which accepts HTTP POST requests for notifications. The WebSub Subscriber Service provides the following capabilities:
- When the service is started a subscription request is sent for a hub/topic combination, either specified as annotations or discovered based on the resource URL specified as an annotation.
- If
onIntentVerification
is not specified, intent verification will be done automatically against the topic specified as an annotation or discovered based on the resource URL specified as an annotation. - If a secret is specified for the subscription, signature validation will be done for authenticated content distribution.
Sends subscription request on service startup and explicit intent verification
When the
subscribeOnStartUp
is set to true in the Subscriber Service, it will result in a subscription request being sent to the specified hub for the specified topic, with the specified lease seconds value and the specified secret for authenticated content distribution.
Since an
onIntentVerification
resource function is not included, intent verification for subscription and unsubscription requests would happen automatically.
ballerina @websub:SubscriberServiceConfig { path: "/websub", subscribeOnStartUp: true, target: ["<HUB_URL>", "<TOPIC_URL>"], leaseSeconds: 3600, secret: "<SECRET>" } service websubSubscriber on websubEP { resource function onNotification(websub:Notification notification) { //... } }
Explicit intent verification can be done by introducing an
onIntentVerification
resource function.
ballerina resource function onIntentVerification(websub:Caller caller, websub:IntentVerificationRequest request) { http:Response response = new; // Insert logic to build subscription/unsubscription intent verification response. error? result = caller->respond(response); }
Functions are made available on the websub:IntentVerificationRequest
to build a subscription or unsubscription
verification response, specifying the topic to verify intent against:
http:Response response = request.buildSubscriptionVerificationResponse("<TOPIC_TO_VERIFY_FOR>");
http:Response response = request.buildUnsubscriptionVerificationResponse("<TOPIC_TO_VERIFY_FOR>");
Hub
A WebSub compliant hub based on the Ballerina Message Broker is also available. This can be used as a remote hub or to be used by publishers who want to have their own internal hub. Ballerina's WebSub hub honors specified lease periods and supports authenticated content distribution.
Enabling Basic Auth support for the hub
The Ballerina WebSub Hub can be secured by enforcing authentication (Basic Authentication) and (optionally) authorization.
The AuthProvider
and authConfig
need to be specified for the hub listener and service respectively. If the
authStoreProvider
of the AuthProvider
is set as "http:CONFIG_AUTH_STORE", usernames and passwords for authentication and scopes for authorization would be read from a config TOML file.
A user can specify AuthProvider
as follows and set it to the hubListenerConfig
record, which is passed when starting the hub.
http:BasicAuthHandler basicAuthHandler = new(new auth:InboundBasicAuthProvider());
http:ServiceEndpointConfiguration hubListenerConfig = {
auth: {
authHandlers: [basicAuthHandler]
},
secureSocket: {
keyStore: {
path: config:getAsString("b7a.home") + "bre/security/ballerinaKeystore.p12",
password: "ballerina"
}
}
};
var val = websub:startHub(new http:Listener(9191, hubListenerConfig));
In addition to the BasicAuthHandler
for the listener, a user also has to specify the authConfig
properties at the service or resource levels.
They can be set by passing arguments for the serviceAuth
, subscriptionResourceAuth
or publisherResourceAuth
parameters when starting up the hub.
Recognized users can be specified in a .toml
file, which can be passed as a configuration file when running the program.
[b7a.users]
[b7a.users.tom]
password="1234"
scopes="scope1"
Once the hub is secured using basic auth, a subscriber should provide the relevant auth
config in the
hubClientConfig
field of the subscriber service annotation.
auth:OutboundBasicAuthProvider basicAuthProvider = new({
username: "tom",
password: "1234"
});
http:BasicAuthHandler basicAuthHandler = new(basicAuthProvider);
@websub:SubscriberServiceConfig {
path: "/ordereventsubscriber",
hubClientConfig: {
auth: {
authHandler: basicAuthHandler
}
}
}
Enabling data persistence for the hub
The Ballerina WebSub Hub supports persistence of topic and subscription data that needs to be restored when the hub is restarted.
Users can introduce their own persistence implementation, by introducing an object type that is structurally
equivalent to the websub:HubPersistenceStore
abstract object.
Persistence can be enabled by setting a suitable websub:HubPersistenceStore
value for the hubPersistenceStore
field
in the HubConfiguration
record, which is passed to the websub:startHub()
function.
Any subscriptions added at the hub will be available even after the hub is restarted.
Publisher
Ballerina WebSub publishers can use utility functions to add WebSub link headers indicating the hub and topic URLs, which facilitates WebSub discovery.
A hub client endpoint is also made available to publishers and subscribers to perform the following:
-
Publishers
-
Register a topic at the Hub
websub:PublisherClient websubHubClientEP = new ("http://localhost:9191/websub/publish"); error? registrationResponse = websubHubClientEP->registerTopic("http://websubpubtopic.com");
-
Publish to the hub indicating an update of the topic
websub:PublisherClient websubHubClientEP = new ("http://localhost:9191/websub/publish"); error? publishResponse = websubHubClientEP.publishUpdate("http://websubpubtopic.com", {"action": "publish", "mode": "internal-hub"});
-
-
Subscribers
-
Subscribe/Unsubscribe to/from topics at a hub
websub:SubscriptionClient websubHubClientEP = new("<HUB_URL>"); // Send subscription request for a subscriber service. websub:SubscriptionChangeRequest subscriptionRequest = { topic: "<TOPIC_URL>", callback: "<CALLBACK_URL>", secret: "<SECRET>" }; websub:SubscriptionChangeResponse|error subscriptionChangeResponse = websubHubClientEP->subscribe(subscriptionRequest); // Send unsubscription request for the subscriber service. websub:SubscriptionChangeRequest unsubscriptionRequest = { topic: "<TOPIC_URL>", callback: "<CALLBACK_URL>" }; websub:SubscriptionChangeResponse|error subscriptionChangeResponse = websubHubClientEP->unsubscribe(unsubscriptionRequest);
-
Introducing Specific Subscriber Services (Webhook Callback Services)
Ballerina's WebSub subscriber service listener can be extended to introduce specific Webhooks.
Instead of the single onNotification
resource, you can introduce multiple resources to accept content delivery requests using specific subscriber services. These resources will correspond to the content delivery requests that will
be delivered with respect to a particular topic.
For example, assume a scenario in which you receive notifications either when an issue is opened or when an issue is closed by subscribing to a particular topic in an issue tracking system. With a custom subscriber service listener, which extends the
generic WebSub subscriber service listener, you can allow two resources to accept content delivery requests (e.g., onIssueOpened
and onIssueClosed
) instead of the onNotification
resource.
These resources will accept two parameters:
- The generic
websub:Notification
record as the first parameter - A custom record corresponding to the expected (JSON) payload of the notification (e.g.,
IssueCreatedEvent
,IssueClosedEvent
)
You can introduce a specific service as such by extending the generic subscriber service listener, specifying a mapping between the expected notifications and the resources that requests need to be dispatched to.
The mapping can be based on one of the following indicators of a notification request. (Requests will then be dispatched based on the value of the indicator in the request and a pre-defined mapping.)
-
A request header
Dispatching will be based on the value of the request header specified as
topicHeader
.websub:ExtensionConfig extensionConfig = { topicIdentifier: websub:TOPIC_ID_HEADER, topicHeader: "<HEADER_TO_CONSIDER>", headerResourceMap: { "issueOpened": ["onIssueOpened", IssueOpenedEvent], "issueClosed": ["onIssueClosed", IssueClosedEvent] } };
The
"issueOpened": ["onIssueOpened", IssueOpenedEvent]
entry indicates that when the value of the<HEADER_TO_CONSIDER>
header isissueOpened
, dispatching should happen to a resource namedonIssueOpened
.The first parameter of this resource will be the generic
websub:Notification
record, and the second parameter will be a customIssueOpenedEvent
record mapping the JSON payload received when an issue is created. -
The payload: the value of a particular key in the JSON payload
Dispatching will be based on the value in the request payload of one of the map keys specified in the
payloadKeyResourceMap
map.websub:ExtensionConfig extensionConfig = { topicIdentifier: websub:TOPIC_ID_PAYLOAD_KEY, payloadKeyResourceMap: { "<PAYLOAD_KEY_TO_CONSIDER>": { "issueOpened": ["onIssueOpened", IssueOpenedEvent], "issueClosed": ["onIssueClosed", IssueClosedEvent] } } };
The
"issueOpened": ["onIssueOpened", IssueOpenedEvent]
entry indicates that when the value for the JSON payload key<PAYLOAD_KEY_TO_CONSIDER>
isissueOpened
, dispatching should happen to a resource namedonIssueOpened
.The first parameter of this resource will be the generic
websub:Notification
record, and the second parameter will be a customIssueOpenedEvent
record, mapping the JSON payload received when an issue is created. -
A request header and the payload (combination of the above two)
Dispatching will be based on both a request header and the payload as specified in the
headerAndPayloadKeyResourceMap
. Also, you can introduce aheaderResourceMap
and/or apayloadKeyResourceMap
as additional mappings.websub:ExtensionConfig extensionConfig = { topicIdentifier: websub:TOPIC_ID_HEADER_AND_PAYLOAD, topicHeader: "<HEADER_TO_CONSIDER>", headerAndPayloadKeyResourceMap: { "issue" : { "<PAYLOAD_KEY_TO_CONSIDER>" : { "opened": ["onIssueOpened", IssueOpenedEvent], "closed": ["onIssueClosed", IssueClosedEvent] } } } };
The
"opened": ["onIssueOpened", IssueOpenedEvent]
entry indicates that when the value of the<HEADER_TO_CONSIDER>
header isissue
and the value of the<PAYLOAD_KEY_TO_CONSIDER>
JSON payload key isopened
, dispatching should happen to a resource namedonIssueOpened
.The first parameter of this resource will be the generic
websub:Notification
record and the second parameter will be a customIssueOpenedEvent
record, mapping the JSON payload received when an issue is created.
The Specific Subscriber Service
In order to introduce a specific subscriber service, a new Ballerina listener
needs to be introduced. This listener
should wrap the generic ballerina/websub:Listener
and include the extension configuration described above.
The following example is for a service provider that
- allows registering webhooks to receive notifications when an issue is opened or assigned
- includes a header named "Event-Header" in each content delivery request indicating what event the notification is for (e.g., "onIssueOpened" when an issue is opened and "onIssueAssigned" when an issue is assigned)
import ballerina/lang.'object as objects;
import ballerina/websub;
// Introduce a record mapping the JSON payload received when an issue is opened.
public type IssueOpenedEvent record {
int id;
string title;
string openedBy;
};
// Introduce a record mapping the JSON payload received when an issue is assigned.
public type IssueAssignedEvent record {
int id;
string assignedTo;
};
// Introduce a new `listener` wrapping the generic `ballerina/websub:Listener`
public type WebhookListener object {
*objects:Listener;
private websub:Listener websubListener;
public function __init(int port) {
// Introduce the extension config, based on the mapping details.
websub:ExtensionConfig extensionConfig = {
topicIdentifier: websub:TOPIC_ID_HEADER,
topicHeader: "Event-Header",
headerResourceMap: {
"issueOpened": ["onIssueOpened", IssueOpenedEvent],
"issueAssigned": ["onIssueAssigned", IssueAssignedEvent]
}
};
// Set the extension config in the generic `websub:Listener` config.
websub:SubscriberListenerConfiguration sseConfig = {
extensionConfig: extensionConfig
};
// Initialize the wrapped generic listener.
self.websubListener = new(port, sseConfig);
}
public function __attach(service s, string? name = ()) returns error? {
return self.websubListener.__attach(s, name);
}
public function __start() returns error? {
return self.websubListener.__start();
}
public function __detach(service s) returns error? {
return self.websubListener.__detach(s);
}
public function __immediateStop() returns error? {
return self.websubListener.__immediateStop();
}
public function __gracefulStop() returns error? {
return self.websubListener.__gracefulStop();
}
};
A service can be introduced for the above service provider as follows.
import ballerina/io;
import ballerina/log;
import ballerina/websub;
@websub:SubscriberServiceConfig {
path: "/subscriber",
subscribeOnStartUp: false
}
service specificSubscriber on new WebhookListener(8080) {
resource function onIssueOpened(websub:Notification notification, IssueOpenedEvent issueOpened) {
log:printInfo(io:sprintf("Issue opened: ID: %s, Title: %s", issueOpened.id, issueOpened.title));
}
resource function onIssueAssigned(websub:Notification notification, IssueAssignedEvent issueAssigned) {
log:printInfo(io:sprintf("Issue ID %s assigned to %s", issueAssigned.id, issueAssigned.assignedTo));
}
}
For a step-by-step guide on introducing custom subscriber services, see the "Create Webhook Callback Services" section of "How to Extend Ballerina".
For information on the operations, which you can perform with this module, see the below Functions. For examples on the usage of the operations, see the following.
- Internal Hub Sample Example
- Remote Hub Sample Example
- Hub Client Sample Example
- Service Integration Sample Example
Detail |
Holds the details of a WebSub error. |
ExtensionConfig |
The extension configuration to introduce custom subscriber services. |
HubConfiguration |
Record representing hub specific configurations. |
HubStartedUpError |
Error to represent that a WebSubHub is already started up, encapsulating the started up Hub. |
RemotePublishConfig |
Record representing remote publishing allowance. |
SubscriberDetails |
Record to represent Subscriber Details. |
SubscriberListenerConfiguration |
Represents the configuration for the WebSub Subscriber Service Listener. |
SubscriberServiceConfiguration |
Configuration for a WebSubSubscriber service. |
SubscriptionChangeRequest |
Record representing a WebSub subscription change request. |
SubscriptionChangeResponse |
Record representing subscription/unsubscription details if a subscription/unsubscription request is successful. |
SubscriptionDetails |
Record to represent persisted Subscription Details retrieved. |
WebSubContent |
Record to represent a WebSub content delivery. |
Hub |
Represents the Ballerina WebSub Hub. |
HubPersistenceStore |
Represents the hub persistence configuration and functions. |
IntentVerificationRequest |
Object representing an intent verification request received. |
Notification |
Represents the WebSub Content Delivery Request received. |
Caller |
The caller remote functions to respond to client requests. |
PublisherClient |
The HTTP based client for WebSub topic registration and unregistration, and notifying the hub of new updates. |
SubscriptionClient |
The HTTP based client for WebSub subscription and unsubscription. |
Listener |
Represents the WebSubSubscriber Service Listener. |
addWebSubLinkHeader |
Function to add link headers to a response to allow WebSub discovery. |
extractTopicAndHubUrls |
Retrieves hub and topic URLs from the |
startHub |
Starts up the Ballerina Hub. |
WEBSUB_ERROR_CODE |
The constant used to represent error code of WebSub module. |
PUBLISH_MODE_DIRECT |
|
PUBLISH_MODE_FETCH |
|
SHA1 |
The constant used to represent SHA-1 cryptographic hash algorithm |
SHA256 |
The constant used to represent SHA-256 cryptographic hash algorithm |
TOPIC_ID_HEADER |
|
TOPIC_ID_PAYLOAD_KEY |
|
TOPIC_ID_HEADER_AND_PAYLOAD |
|
FAILURE_REASON_SUBSCRIPTION_GONE |
Represents subscription delivery failure due to the subscriber's callback URL replying with a HTTP 410 Gone status code. |
FAILURE_REASON_FAILURE_STATUS_CODE |
Represents subscription delivery failures related to HTTP failure status codes except 410 Gone. |
FAILURE_REASON_DELIVERY_FAILURE |
Represents subscription delivery failures related to network issues. |
LISTENER_STARTUP_ERROR |
Represents the reason string for the |
HUB_STARTUP_ERROR_REASON |
Represents the reason string for the |
SpecificSubscriber |
Annotation to declare that the service represents a specific webhook. |
SubscriberServiceConfig |
WebSub Subscriber Configuration for the service, indicating subscription related parameters. |
FailureReason |
Represents subscription delivery failure reasons. |
RemotePublishMode |
The identifier to be used to identify the mode in which update content should be identified. |
SignatureMethod |
The identifier to be used to identify the cryptographic hash algorithm. |
TopicIdentifier |
The identifier to be used to identify the topic for dispatching with custom subscriber services. |
HubStartupError |
Represents a hub startup error. |
ListenerStartupError |
Represents a listener startup error. |