Extracting, transforming, and loading data often involves dealing with intricate, time-consuming processes, leading to high costs and operational inefficiencies.

Ballerina simplifies ETL with its programmable approach. It excels in complex data operations, integrations, and deployment flexibility, ensuring a scalable and adaptable ETL experience.

Download Ballerina

AI

Data extraction (E)

Drive data interactions with models

Ballerina's persist feature seamlessly connects to databases, retrieves data, and kickstarts your ETL process.

function getOrderData() returns error? {
    stream<OrderData, persist:Error?> orders = dbClient->/orderdata();
    check from var orderData in orders
        do {
            io:println(orderData);
        };
}

function getCustomerData() returns error? {
    stream<CustomerData, persist:Error?> customers = dbClient->/customers();
    check from var customerData in customers
        do {
            io:println(customerData);
        };
}

function getOrderWithCustomer(string orderId) returns error? {
    OrderWithCustomer orderwithCustomer = check dbClient->/orderdata/[orderId];
    io:println(orderwithCustomer);
}

Make data extraction smarter

Ballerina AI integrations can extract information from unstructured data, unlocking vast amounts of information captured in emails, documents, comments, etc. in the extraction process.

final chat:Client openAiChat = check new ({auth: {token: openAIKey}});

isolated service /api/reviews on new http:Listener(8080) {
    resource function post summary(SummaryRequest summaryRequest) returns error? {
        chat:CreateChatCompletionRequest request = {
            model: "gpt-3.5-turbo",
            messages: [
                {
                    role: "user",
                    content: string `
                        Extract the following details in JSON from the reviews given.
                            {
                                good_points: string,
                                bad_points: string,
                                improvement_points: string
                            }
                        The fields should contain points extracted from all reviews
                        Here are the reviews:
                        ${string:'join(",", ...summaryRequest.reviews)}
                    `
                }
            ]
        };
        chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
        if summary.choices.length() > 0 {
            string content = check summary.choices[0].message?.content.ensureType();
            io:println(content);
        }
    }
}

Unleash data from APIs

Ballerina offers a wide range of built-in connectors, including HTTP and GraphQL clients, making it easy to interact with APIs for efficient data extraction in your ETL pipeline.

configurable string githubAccessToken = ?;
configurable string repoOwner = "ballerina-platform";
configurable string repoName = "ballerina-lang";

final github:Client githubClient = check new ({
    auth: {
        token: githubAccessToken
    }
});

public function main() returns error? {
    github:IssueFilters filters = {
        labels: ["Type/NewFeature", "Priority/High"],
        states: [github:ISSUE_OPEN]
    };
    stream<github:Issue, github:Error?> openFeatures
            = check githubClient->getIssues(repoOwner, repoName, filters);
    check from var feature in openFeatures
        do {
            io:println(feature.title);
        };
}

Data transformation (T)

Eliminating redundancy for cleaner data

Ballerina simplifies duplicate removal through Ballerina queries and other intuitive constructs.

Sample 1: Remove duplicates based on composite fieldsView code on GitHub
Sample 2: Remove duplicates based on approximationView code on GitHub
type SalesOrder record {|
    string itemId;
    string customerId;
    string itemName;
    int quantity;
    int date;
|};

function removeDuplicates(SalesOrder[] orders) returns SalesOrder[] {
    return from var {itemId, customerId, itemName, quantity, date} in orders
        group by itemId, customerId, itemName
        select {
            itemId,
            customerId,
            itemName,
            quantity: [quantity][0],
            date: [date][0]
        };
}
public type Customer record {|
    string name;
    string city;
    string phone;
|};

function findDuplicates(Customer[] customers) returns [Customer[], Customer[]] {
    Customer[] duplicates = [];
    Customer[] uniqueCustomers = [customers[0]];
    foreach int customerIdx in 1 ... customers.length() - 1 {
        foreach Customer uniqueCustomer in uniqueCustomers {
            Customer customer = customers[customerIdx];
            if getEditDistance(customer.name, uniqueCustomer.name) < 3 &&
                getEditDistance(customer.city, uniqueCustomer.city) < 2 &&
                getEditDistance(customer.phone, uniqueCustomer.phone) < 4 {
                duplicates.push(customer);
            } else {
                uniqueCustomers.push(customer);
            }
        }
    }
    return [uniqueCustomers, duplicates];
}

Refining data excellence

Ballerina's seamless integration with AI models enhances error correction, ensuring data integrity and precision during transformation.

configurable string openAIToken = ?;
final http:Client openAIClient = check new ("https://api.openai.com", {auth: {token: openAIToken}});

public function main(string filePath) returns error? {
    string inputString = check io:fileReadString(filePath);

    OpenAiChatResponse response = check openAIClient->/v1/chat/completions.post(
        {
            model: "gpt-3.5-turbo",
            messages: [
                {
                    role: "user",
                    content: "Fix grammar and spelling mistakes."
                },
                {
                    role: "user",
                    content: inputString
                }
            ]
        }
    );

    io:println(string `Corrected: ${response.choices[0].message.content}`);
}

Data enrichment magic

Ballerina streamlines data enrichment, simplifying the journey from data extraction from databases or APIs to adding depth and context to your insights.

final http:Client geocodingClient = check new ("https://maps.googleapis.com");

isolated service /api/v1 on new http:Listener(8080) {
    resource function post customerWithGeoCode(Customer customer) returns GeoTaggedCustomer|error {
        GeocodeResponse response = check geocodingClient->
            /maps/api/geocode/'json(address = customer.address, key = geocodingAPIKey);
        if response.status == "OK" {
            return {
                ...customer,
                latitude: response.results[0].geometry.location.lat,
                longitude: response.results[0].geometry.location.lng
            };
        }
        return error("Error while getting the location information.");
    }
}

Reshaping data with data mapper

Comprehensive data mapping support facilitates the transformation of source data into any schema expected by target systems.

Reshaping data with data mapper

Precision cleaning with regular expressions

Ballerina's regex support empowers data cleansing and standardization, enhancing data quality and consistency.

function standardizeValues(string inputString) returns int {
    string:RegExp yesPattern = re `y|Y|((y|Y)(e|E)(s|S))`;
    if yesPattern.isFullMatch(inputString) {
        return 1;
    }
    string:RegExp noPattern = re `n|N|((n|N)(o|O))`;
    if noPattern.isFullMatch(inputString) {
        return -1;
    }
    return 0;
}

function isValidEmail(string inputString) returns boolean {
    string:RegExp emailPattern =  re `[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,}`;
    return emailPattern.isFullMatch(inputString);
}

function removeExtraWhiteSpaces(string inputString) returns string {
    string:RegExp extraSpaces = re `\s+`;
    return extraSpaces.replaceAll(inputString, " ");
}

Data loading (L)

Real-time data flow

Ballerina's support for various data streaming sources, such as Kafka, RabbitMQ, and NATS, facilitates the creation of real-time streaming ETL pipelines.

final kafka:ConsumerConfiguration consumerConfiguration = {
    groupId: "lead-uuid",
    topics: ["lead-analytics"],
    pollingInterval: 1,
    autoCommit: false
};

final Client leadsDbClient = check new;

listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);

isolated service on kafkaListener {
    remote function onConsumerRecord(kafka:Caller caller, LeadAnalytics[] leadsData) returns error? {
        LeadAnalyticsDataInsert[] insertData = from var lead in leadsData
            select {id: uuid:createType1AsString(), ...lead};
        _ = check leadsDbClient->/leadanalyticsdata.post(insertData);
    }
}

Loading data to a data warehouse

Ballerina's integration and security capabilities simplify the loading of transformed data securely into on-prem or SaaS data warehouses.

final bigquery:Client bigQueryClient = check new ({auth: {token: bigQueryAccessToken}});

public function main() returns error? {
    SocialMediaInteraction[] interactions = check io:fileReadCsv("./resources/interactions.csv");
    bigquery:TabledatainsertallrequestRows[] rows = from var interaction in interactions
        select {insertId: uuid:createType1AsString(), 'json: interaction};
    bigquery:TableDataInsertAllRequest payload = {rows};
    _ = check bigQueryClient->insertAllTableData(projectId, datasetId, tableId, payload);
}

Manual data inspection

Ballerina can connect with Google Sheets or Excel to load data samples, intermediate data, or processed data for manual inspection.

function loadToGoogleSheet(string sheetName, string workSheetName, 
    SalesSummary[] salesSummary) returns error? {
    sheets:Spreadsheet spreadsheet = check spreadsheetClient->createSpreadsheet(sheetName);
    string spreadSheetId = spreadsheet.spreadsheetId;

    check spreadsheetClient->renameSheet(spreadSheetId, "Sheet1", workSheetName);

    _ = check spreadsheetClient
            ->appendValue(spreadSheetId, ["Product", "Sales"], {sheetName: workSheetName});
    foreach var {product, sales} in salesSummary {
        _ = check spreadsheetClient
            ->appendValue(spreadSheetId, [product, sales], {sheetName: workSheetName});
    }
}