import ballerina/grpc;
import ballerina/log;map<grpc:Caller> consMap = {};service Chat on new grpc:Listener(9090) {    resource function  chat(grpc:Caller caller,
                                stream<ChatMessage, error> clientStream) {
        log:printInfo(string `${caller.getId()} connected to chat`);
        consMap[caller.getId().toString()] = <@untainted>caller;
        error? e = clientStream.forEach(function(ChatMessage chatMsg) {
            grpc:Caller ep;
            string msg = string `${chatMsg.name}: ${chatMsg.message}`;
            log:printInfo("Server received message: " + msg);
            foreach var [callerId, connection] in consMap.entries() {
                ep = connection;
                grpc:Error? err = ep->send(msg);
                if (err is grpc:Error) {
                    log:printError("Error from Connector: " + err.message());
                } else {
                    log:printInfo("Server message to caller " + callerId
                                                     + " sent successfully.");
                }
            }
        });
        if (e is grpc:EOS) {
            string msg = string `${caller.getId()} left the chat`;
            log:printInfo(msg);
            var v = consMap.remove(caller.getId().toString());
            foreach var [callerId, connection] in consMap.entries() {
                grpc:Caller ep = connection;
                grpc:Error? err = ep->send(msg);
                if (err is grpc:Error) {
                    log:printError("Error from Connector: " + err.message());
                } else {
                    log:printInfo("Server message to caller " + callerId
                                                      + " sent successfully.");
                }
            }
        } else if (e is error) {
            log:printError("Error from Connector: " + e.message());
        }    }
}
import ballerina/grpc;
import ballerina/io;
import ballerina/runtime;int total = 0;
public function main() {
    ChatClient chatEp = new ("http://localhost:9090");    grpc:StreamingClient ep;
    var res = chatEp->chat(ChatMessageListener);    if (res is grpc:Error) {
        io:println("Error from Connector: " + res.message());
        return;
    } else {
        io:println("Initialized connection sucessfully.");
        ep = res;
    }
    ChatMessage mes = {name: "Sam", message: "Hi "};
    grpc:Error? connErr = ep->send(mes);    if (connErr is grpc:Error) {
        io:println("Error from Connector: " + connErr.message());
    }
    runtime:sleep(6000);
    grpc:Error? result = ep->complete();
    if (result is grpc:Error) {
        io:println("Error in sending complete message", result);
    }
}service ChatMessageListener = service {
    resource function onMessage(string message) {
        io:println("Response received from server: " + message);
    }
    resource function onError(error err) {
        io:println("Error reported from server: " + err.message());
    }
    resource function onComplete() {
        io:println("Server Complete Sending Responses.");
    }
};

Bidirectional Streaming

The gRPC Server Connector exposes the gRPC service over http2. This sample demonstrates how the gRPC bidirectional streaming service and the non blocking client operate when each of them sends a sequence of messages using a read-write stream. In such scenarios, the two streams operate independently. Therefore, clients and servers can read and write in any order.

For more information on the underlying module, see the GRPC module.

syntax = "proto3";
import "google/protobuf/wrappers.proto";
service Chat {
	rpc chat (stream ChatMessage)
			returns (stream google.protobuf.StringValue);
}
message ChatMessage {
	string name = 1;
	string message = 2;
}

This is the service definition for the bidirectional streaming scenario.

# Create new Protocol Buffers definition file `grpc_bidirectional_streaming.proto` and add service definition.
# Run the command below in the Ballerina tools distribution for stub generation.
ballerina grpc --input grpc_bidirectional_streaming.proto  --output stubs
# Once you run the command, `grpc_bidirectional_streaming_pb.bal` file is generated inside stubs directory.
# Please refer example `Proto To Ballerina` to get information on how to use Ballerina Protocol Buffers tool.
import ballerina/grpc;
import ballerina/log;

This is the server implementation for the bidirectional streaming scenario.

map<grpc:Caller> consMap = {};
service Chat on new grpc:Listener(9090) {
    resource function  chat(grpc:Caller caller,
                                stream<ChatMessage, error> clientStream) {
        log:printInfo(string `${caller.getId()} connected to chat`);
        consMap[caller.getId().toString()] = <@untainted>caller;
        error? e = clientStream.forEach(function(ChatMessage chatMsg) {
            grpc:Caller ep;
            string msg = string `${chatMsg.name}: ${chatMsg.message}`;
            log:printInfo("Server received message: " + msg);
            foreach var [callerId, connection] in consMap.entries() {
                ep = connection;
                grpc:Error? err = ep->send(msg);
                if (err is grpc:Error) {
                    log:printError("Error from Connector: " + err.message());
                } else {
                    log:printInfo("Server message to caller " + callerId
                                                     + " sent successfully.");
                }
            }
        });

Read and process each message in the client stream

        if (e is grpc:EOS) {
            string msg = string `${caller.getId()} left the chat`;
            log:printInfo(msg);
            var v = consMap.remove(caller.getId().toString());
            foreach var [callerId, connection] in consMap.entries() {
                grpc:Caller ep = connection;
                grpc:Error? err = ep->send(msg);
                if (err is grpc:Error) {
                    log:printError("Error from Connector: " + err.message());
                } else {
                    log:printInfo("Server message to caller " + callerId
                                                      + " sent successfully.");
                }
            }

Once the client sends a notification to indicate the end of the stream, ‘grpc:EOS’ is returned by the stream

        } else if (e is error) {
            log:printError("Error from Connector: " + e.message());
        }

If the client sends an error to the server, the stream closes and returns the error

    }
}
# Create a Ballerina project and a module inside it.
# Copy generated stub file `grpc_bidirectional_streaming_pb.bal` to the module.
# For example, if you create a module named `service`, copy the stub file to the `service` module.
# Add new Ballerina file `grpc_bidirectional_streaming.bal` inside the `service` module and add service implementation.
# Execute the command below to build the 'service' module.
ballerina build service
# Run the service using the command below.
ballerina run target/bin/service.jar
import ballerina/grpc;
import ballerina/io;
import ballerina/runtime;

This is client implementation for bidirectional streaming scenario.

int total = 0;
public function main() {
    ChatClient chatEp = new ("http://localhost:9090");

Client endpoint configuration.

    grpc:StreamingClient ep;
    var res = chatEp->chat(ChatMessageListener);

Executes unary non-blocking call registering server message listener.

    if (res is grpc:Error) {
        io:println("Error from Connector: " + res.message());
        return;
    } else {
        io:println("Initialized connection sucessfully.");
        ep = res;
    }
    ChatMessage mes = {name: "Sam", message: "Hi "};
    grpc:Error? connErr = ep->send(mes);

Sends multiple messages to the server.

    if (connErr is grpc:Error) {
        io:println("Error from Connector: " + connErr.message());
    }
    runtime:sleep(6000);
    grpc:Error? result = ep->complete();
    if (result is grpc:Error) {
        io:println("Error in sending complete message", result);
    }
}

Once all messages are sent, client send complete message to notify the server, I’m done.

service ChatMessageListener = service {
    resource function onMessage(string message) {
        io:println("Response received from server: " + message);
    }

Resource registered to receive server messages.

    resource function onError(error err) {
        io:println("Error reported from server: " + err.message());
    }

Resource registered to receive server error messages.

    resource function onComplete() {
        io:println("Server Complete Sending Responses.");
    }
};

Resource registered to receive server completed message.

# Create a Ballerina project and a module inside it.
# Copy generated stub file `grpc_bidirectional_streaming_pb.bal` to the module.
# For example, if you create a module named `client`, copy the stub file to the `client` module.
# Add new Ballerina file `grpc_bidirectional_streaming_client.bal` inside the `client` module and add client implementation.
# Execute the command below to build the 'client' module.
ballerina build client
# Run the client using the command below.
ballerina run target/bin/client.jar