import ballerina/io;

type Error error;

type LineStream stream<string, Error?>;

type ValueRecord record {|
    string value;
|};

const SAMPLE_LINE_COUNT = 5;

class LineGenerator {
    int i = -1;
    string inputString;

    public function init(string str) {
        self.inputString = str;
    }

    public isolated function next() returns ValueRecord|Error? {
        self.i += 1;
        if (self.i < SAMPLE_LINE_COUNT) {
            if (self.i % 2 == 0) {
                return {value: self.inputString};
            }
            return {value: ""};
        }
        return;
    }
}

// This method strips the blank lines.
function strip(LineStream lines) returns LineStream {
    // Creates a `stream` from the query expression.
    LineStream res = stream from var line in lines
             where line.trim().length() > 0
             select line;

    return res;
}

function count(LineStream lines) returns int|Error {
    int nLines = 0;
    // Counts the number of lines by iterating the `stream`
    // in `query action`.
    var _ = check from var _ in lines
              do {
                  nLines += 1;
              };

    return nLines;
}

public function main() {
    LineGenerator generator = new ("Everybody can dance");
    LineStream inputLineStream = new (generator);

    LineStream strippedStream = strip(inputLineStream);

    int|Error nonBlankCount = count(strippedStream);

    if (nonBlankCount is int) {
        io:println("Input line count: ", SAMPLE_LINE_COUNT.toString());
        io:println("Non blank line count: ", nonBlankCount.toString());
    }
}

Querying with streams

If stream terminates with error, result of query expression is an error. You cannot use foreach on stream type with termination type that allows error. Instead use from with do clause; the result is a subtype of error?. Use stream keyword in front of from to create a stream which is lazily evaluated. The failure of check within the query will cause the stream to produce an error termination value.

import ballerina/io;
type Error error;
type LineStream stream<string, Error?>;
type ValueRecord record {|
    string value;
|};
const SAMPLE_LINE_COUNT = 5;
class LineGenerator {
    int i = -1;
    string inputString;
    public function init(string str) {
        self.inputString = str;
    }
    public isolated function next() returns ValueRecord|Error? {
        self.i += 1;
        if (self.i < SAMPLE_LINE_COUNT) {
            if (self.i % 2 == 0) {
                return {value: self.inputString};
            }
            return {value: ""};
        }
        return;
    }
}
function strip(LineStream lines) returns LineStream {

This method strips the blank lines.

    LineStream res = stream from var line in lines
             where line.trim().length() > 0
             select line;

Creates a stream from the query expression.

    return res;
}
function count(LineStream lines) returns int|Error {
    int nLines = 0;
    var _ = check from var _ in lines
              do {
                  nLines += 1;
              };

Counts the number of lines by iterating the stream in query action.

    return nLines;
}
public function main() {
    LineGenerator generator = new ("Everybody can dance");
    LineStream inputLineStream = new (generator);
    LineStream strippedStream = strip(inputLineStream);
    int|Error nonBlankCount = count(strippedStream);
    if (nonBlankCount is int) {
        io:println("Input line count: ", SAMPLE_LINE_COUNT.toString());
        io:println("Non blank line count: ", nonBlankCount.toString());
    }
}
bal run querying_with_streams.bal
Input line count: 5
Non blank line count: 3