import ballerina/http;
import ballerina/log;
import ballerina/math;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/"
}
service InitiatorService on new http:Listener(8080) {    @http:ResourceConfig {
        methods: ["GET"],
        path: "/"
    }
    resource function init(http:Caller conn, http:Request req) {
        http:Response res = new;
        log:printInfo("Initiating transaction...");
        transaction {
            log:printInfo("Started transaction: " +
                             transactions:getCurrentTransactionId());
            boolean successful = callBusinessService();
            if (successful) {
                res.statusCode = http:OK_200;
            } else {
                res.statusCode = http:INTERNAL_SERVER_ERROR_500;
                abort;
            }
        } committed {
            log:printInfo("Initiated transaction committed");
        } aborted {
            log:printInfo("Initiated transaction aborted");
        }        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to client",
                            err = result);
        } else {
            log:printInfo("Sent response back to client");
        }
    }
}function callBusinessService() returns boolean {
    http:Client participantEP = new("http://localhost:8889/stockquote/update");    float price = math:randomInRange(200, 250) + math:random();
    json bizReq = { symbol: "GOOG", price: price };
    http:Request req = new;
    req.setJsonPayload(bizReq);
    var result = participantEP->post("", req);
    log:printInfo("Got response from bizservice");
    if (result is error) {
        return false;
    }  else {
        return (result.statusCode == http:OK_200);
    }
}import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/stockquote"
}
service ParticipantService on new http:Listener(8889) {    @http:ResourceConfig {
        path: "/update"
    }
    @transactions:Participant {
        oncommit:printParticipantCommit,
        onabort:printParticipantAbort
    }
    resource function updateStockQuote(http:Caller conn, http:Request req) {
        log:printInfo("Received update stockquote request");
        http:Response res = new;
        log:printInfo("Joined transaction: " +
                       transactions:getCurrentTransactionId());        var updateReq = untaint req.getJsonPayload();
        if (updateReq is json) {
            string msg =
                io:sprintf("Update stock quote request received.
                            symbol:%s, price:%s",
                            updateReq.symbol,
                            updateReq.price);
            log:printInfo(msg);            json jsonRes = { "message": "updating stock" };
            res.statusCode = http:OK_200;
            res.setJsonPayload(jsonRes);
        } else {
            res.statusCode = http:INTERNAL_SERVER_ERROR_500;
            res.setPayload(updateReq.reason());
            log:printError("Payload error occurred!", err = updateReq);
        }        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to initiator",
                                 err = result);
        } else {
            log:printInfo("Sent response back to initiator");
        }
    }
}
function printParticipantAbort(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " aborted");
}
function printParticipantCommit(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " committed");
}

Distributed Transactions

To improve the reliability of microservice-based applications, a series of microservice invocations can be bound into a single unit of work. The underlying mechanisms are that of infection and agreement protocols. This example demonstrates the Ballerina distributed transactions protocol in action. Ballerina transactions are at experimental stage, please use –experimental flag to enable them.

import ballerina/http;
import ballerina/log;
import ballerina/math;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/"
}
service InitiatorService on new http:Listener(8080) {

This is the initiator of the distributed transaction.

    @http:ResourceConfig {
        methods: ["GET"],
        path: "/"
    }
    resource function init(http:Caller conn, http:Request req) {
        http:Response res = new;
        log:printInfo("Initiating transaction...");
        transaction {

When the transaction statement starts, a distributed transaction context is created.

            log:printInfo("Started transaction: " +
                             transactions:getCurrentTransactionId());

Print the current transaction ID

            boolean successful = callBusinessService();
            if (successful) {
                res.statusCode = http:OK_200;
            } else {
                res.statusCode = http:INTERNAL_SERVER_ERROR_500;
                abort;
            }

When a participant is called, the transaction context is propagated, and that participant gets infected and joins the distributed transaction.

        } committed {
            log:printInfo("Initiated transaction committed");
        } aborted {
            log:printInfo("Initiated transaction aborted");
        }

As soon as the transaction block ends, the 2-phase commit coordination protocol will run. All participants are prepared and depending on the joint outcome, either a notify commit or notify abort will be sent to the participants.

        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to client",
                            err = result);
        } else {
            log:printInfo("Sent response back to client");
        }
    }
}
function callBusinessService() returns boolean {
    http:Client participantEP = new("http://localhost:8889/stockquote/update");
    float price = math:randomInRange(200, 250) + math:random();
    json bizReq = { symbol: "GOOG", price: price };
    http:Request req = new;
    req.setJsonPayload(bizReq);
    var result = participantEP->post("", req);
    log:printInfo("Got response from bizservice");
    if (result is error) {
        return false;
    }  else {
        return (result.statusCode == http:OK_200);
    }
}
# To start the `initiator` service, navigate to the directory that contains the
# `.bal` file and use the `ballerina run` command.
$ ballerina run --experimental initiator.bal
Initiating service(s) in 'initiator.bal'
[ballerina/http] started HTTP/WS endpoint 10.100.1.182:53871
[ballerina/http] started HTTP/WS endpoint localhost:8080
import ballerina/http;
import ballerina/io;
import ballerina/log;
import ballerina/transactions;
@http:ServiceConfig {
    basePath: "/stockquote"
}
service ParticipantService on new http:Listener(8889) {

This service is a participant in the distributed transaction. It will get infected when it receives a transaction context from the participant. The transaction context, in the HTTP case, will be passed in as custom HTTP headers.

    @http:ResourceConfig {
        path: "/update"
    }
    @transactions:Participant {
        oncommit:printParticipantCommit,
        onabort:printParticipantAbort
    }
    resource function updateStockQuote(http:Caller conn, http:Request req) {
        log:printInfo("Received update stockquote request");
        http:Response res = new;

At the beginning of the transaction statement, since a transaction context has been received, this service will register with the initiator as a participant.

        log:printInfo("Joined transaction: " +
                       transactions:getCurrentTransactionId());

Print the current transaction ID

        var updateReq = untaint req.getJsonPayload();
        if (updateReq is json) {
            string msg =
                io:sprintf("Update stock quote request received.
                            symbol:%s, price:%s",
                            updateReq.symbol,
                            updateReq.price);
            log:printInfo(msg);
            json jsonRes = { "message": "updating stock" };
            res.statusCode = http:OK_200;
            res.setJsonPayload(jsonRes);
        } else {
            res.statusCode = http:INTERNAL_SERVER_ERROR_500;
            res.setPayload(updateReq.reason());
            log:printError("Payload error occurred!", err = updateReq);
        }
        var result = conn->respond(res);
        if (result is error) {
            log:printError("Could not send response back to initiator",
                                 err = result);
        } else {
            log:printInfo("Sent response back to initiator");
        }
    }
}
function printParticipantAbort(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " aborted");
}

The participant function that will get called when the distributed transaction is aborted

function printParticipantCommit(string transactionId) {
    log:printInfo("Participated transaction: " + transactionId + " committed");
}

The participant function that will get called when the distributed transaction is committed

# To start the `participant` service, navigate to the directory that contains the
# `.bal` file and use the `ballerina run` command.
#Run this command to start the `participant` service.
$ ballerina run  --experimental participant.bal
Initiating service(s) in 'participant.bal'
[ballerina/http] started HTTP/WS endpoint 10.100.1.182:54774
[ballerina/http] started HTTP/WS endpoint localhost:8889
#Run this curl command to invoke the services. 
$ curl -v localhost:8080
Outputs similar to the following should be available from the initiator and participant.
Output from initiator:
2018-12-04 19:38:12,557 INFO  [ballerina/log] - Initiating transaction...
2018-12-04 19:38:12,580 INFO  [ballerina/log] - Created transaction: a958b772-be82-4d90-8044-6fb8be7f9a65
2018-12-04 19:38:12,589 INFO  [ballerina/log] - Started transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1
2018-12-04 19:38:13,111 INFO  [ballerina/log] - register called......
2018-12-04 19:38:13,131 INFO  [ballerina/log] - Registered remote participant: 46d070c5-a010-4b9a-81f0-aadaf59cc476:$anon$.$1 for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65
2018-12-04 19:38:13,289 INFO  [ballerina/log] - Got response from bizservice
2018-12-04 19:38:13,293 INFO  [ballerina/log] - Running 2-phase commit for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1
2018-12-04 19:38:13,297 INFO  [ballerina/log] - Preparing remote participant: 46d070c5-a010-4b9a-81f0-aadaf59cc476:$anon$.$1
2018-12-04 19:38:13,317 INFO  [ballerina/log] - Remote participant: 46d070c5-a010-4b9a-81f0-aadaf59cc476:$anon$.$1 prepared
2018-12-04 19:38:13,318 INFO  [ballerina/log] - Notify(commit) remote participant: http://192.168.8.100:53311/balcoordinator/participant/2pc/$anon$.$1
2018-12-04 19:38:13,328 INFO  [ballerina/log] - Remote participant: 46d070c5-a010-4b9a-81f0-aadaf59cc476:$anon$.$1 committed
2018-12-04 19:38:13,330 INFO  [ballerina/log] - Initiated transaction committed
2018-12-04 19:38:13,332 INFO  [ballerina/log] - Sent response back to client
Output from participant:
2018-12-04 19:38:12,997 INFO  [ballerina/log] - register at: http://192.168.8.100:53310/balcoordinator/initiator/$anon$.$1/register
2018-12-04 19:38:13,004 INFO  [ballerina/log] - Registering for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1 with coordinator: http://192.168.8.100:53310/balcoordinator/initiator/$anon$.$1/register
2018-12-04 19:38:13,005 INFO  [ballerina/log] - registering..
2018-12-04 19:38:13,144 INFO  [ballerina/log] - Registered with coordinator for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65
2018-12-04 19:38:13,146 INFO  [ballerina/log] - Received update stockquote request
2018-12-04 19:38:13,147 INFO  [ballerina/log] - Joined transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1
2018-12-04 19:38:13,267 INFO  [ballerina/log] - Update stock quote request received.
                            symbol:GOOG, price:227.96048357982156
2018-12-04 19:38:13,287 INFO  [ballerina/log] - Sent response back to initiator
2018-12-04 19:38:13,311 INFO  [ballerina/log] - Prepare received for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1
2018-12-04 19:38:13,312 INFO  [ballerina/log] - Prepared transaction: a958b772-be82-4d90-8044-6fb8be7f9a65
2018-12-04 19:38:13,324 INFO  [ballerina/log] - Notify(commit) received for transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1
2018-12-04 19:38:13,325 INFO  [ballerina/log] - Participated transaction: a958b772-be82-4d90-8044-6fb8be7f9a65:$anon$.$1 committed