Ballerina for ETL
Extracting, transforming, and loading data often involves dealing with intricate, time-consuming processes, leading to high costs and operational inefficiencies.
Ballerina simplifies ETL with its programmable approach. It excels in complex data operations, integrations, and deployment flexibility, ensuring a scalable and adaptable ETL experience.
Data extraction (E)
Drive data interactions with models
Ballerina's persist
feature seamlessly connects to databases, retrieves data, and kickstarts your ETL process.
function getOrderData() returns error? {
stream<OrderData, persist:Error?> orders = dbClient->/orderdata();
check from var orderData in orders
do {
io:println(orderData);
};
}
function getCustomerData() returns error? {
stream<CustomerData, persist:Error?> customers = dbClient->/customers();
check from var customerData in customers
do {
io:println(customerData);
};
}
function getOrderWithCustomer(string orderId) returns error? {
OrderWithCustomer orderwithCustomer = check dbClient->/orderdata/[orderId];
io:println(orderwithCustomer);
}
Make data extraction smarter
Ballerina AI integrations can extract information from unstructured data, unlocking vast amounts of information captured in emails, documents, comments, etc. in the extraction process.
final chat:Client openAiChat = check new ({auth: {token: openAIKey}});
isolated service /api/reviews on new http:Listener(8080) {
resource function post summary(SummaryRequest summaryRequest) returns error? {
chat:CreateChatCompletionRequest request = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: string `
Extract the following details in JSON from the reviews given.
{
good_points: string,
bad_points: string,
improvement_points: string
}
The fields should contain points extracted from all reviews
Here are the reviews:
${string:'join(",", ...summaryRequest.reviews)}
`
}
]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
string content = check summary.choices[0].message?.content.ensureType();
io:println(content);
}
}
}
Unleash data from APIs
Ballerina offers a wide range of built-in connectors, including HTTP and GraphQL clients, making it easy to interact with APIs for efficient data extraction in your ETL pipeline.
configurable string githubAccessToken = ?;
configurable string repoOwner = "ballerina-platform";
configurable string repoName = "ballerina-lang";
final github:Client githubClient = check new ({
auth: {
token: githubAccessToken
}
});
public function main() returns error? {
github:IssueFilters filters = {
labels: ["Type/NewFeature", "Priority/High"],
states: [github:ISSUE_OPEN]
};
stream<github:Issue, github:Error?> openFeatures
= check githubClient->getIssues(repoOwner, repoName, filters);
check from var feature in openFeatures
do {
io:println(feature.title);
};
}
Data transformation (T)
Eliminating redundancy for cleaner data
Ballerina simplifies duplicate removal through Ballerina queries and other intuitive constructs.
type SalesOrder record {|
string itemId;
string customerId;
string itemName;
int quantity;
int date;
|};
function removeDuplicates(SalesOrder[] orders) returns SalesOrder[] {
return from var {itemId, customerId, itemName, quantity, date} in orders
group by itemId, customerId, itemName
select {
itemId,
customerId,
itemName,
quantity: [quantity][0],
date: [date][0]
};
}
public type Customer record {|
string name;
string city;
string phone;
|};
function findDuplicates(Customer[] customers) returns [Customer[], Customer[]] {
Customer[] duplicates = [];
Customer[] uniqueCustomers = [customers[0]];
foreach int customerIdx in 1 ... customers.length() - 1 {
foreach Customer uniqueCustomer in uniqueCustomers {
Customer customer = customers[customerIdx];
if getEditDistance(customer.name, uniqueCustomer.name) < 3 &&
getEditDistance(customer.city, uniqueCustomer.city) < 2 &&
getEditDistance(customer.phone, uniqueCustomer.phone) < 4 {
duplicates.push(customer);
} else {
uniqueCustomers.push(customer);
}
}
}
return [uniqueCustomers, duplicates];
}
Refining data excellence
Ballerina's seamless integration with AI models enhances error correction, ensuring data integrity and precision during transformation.
configurable string openAIToken = ?;
final http:Client openAIClient = check new ("https://api.openai.com", {auth: {token: openAIToken}});
public function main(string filePath) returns error? {
string inputString = check io:fileReadString(filePath);
OpenAiChatResponse response = check openAIClient->/v1/chat/completions.post(
{
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: "Fix grammar and spelling mistakes."
},
{
role: "user",
content: inputString
}
]
}
);
io:println(string `Corrected: ${response.choices[0].message.content}`);
}
Data enrichment magic
Ballerina streamlines data enrichment, simplifying the journey from data extraction from databases or APIs to adding depth and context to your insights.
final http:Client geocodingClient = check new ("https://maps.googleapis.com");
isolated service /api/v1 on new http:Listener(8080) {
resource function post customerWithGeoCode(Customer customer) returns GeoTaggedCustomer|error {
GeocodeResponse response = check geocodingClient->
/maps/api/geocode/'json(address = customer.address, key = geocodingAPIKey);
if response.status == "OK" {
return {
...customer,
latitude: response.results[0].geometry.location.lat,
longitude: response.results[0].geometry.location.lng
};
}
return error("Error while getting the location information.");
}
}
Reshaping data with data mapper
Comprehensive data mapping support facilitates the transformation of source data into any schema expected by target systems.
Precision cleaning with regular expressions
Ballerina's regex support empowers data cleansing and standardization, enhancing data quality and consistency.
function standardizeValues(string inputString) returns int {
string:RegExp yesPattern = re `y|Y|((y|Y)(e|E)(s|S))`;
if yesPattern.isFullMatch(inputString) {
return 1;
}
string:RegExp noPattern = re `n|N|((n|N)(o|O))`;
if noPattern.isFullMatch(inputString) {
return -1;
}
return 0;
}
function isValidEmail(string inputString) returns boolean {
string:RegExp emailPattern = re `[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,}`;
return emailPattern.isFullMatch(inputString);
}
function removeExtraWhiteSpaces(string inputString) returns string {
string:RegExp extraSpaces = re `\s+`;
return extraSpaces.replaceAll(inputString, " ");
}
Data loading (L)
Real-time data flow
Ballerina's support for various data streaming sources, such as Kafka, RabbitMQ, and NATS, facilitates the creation of real-time streaming ETL pipelines.
final kafka:ConsumerConfiguration consumerConfiguration = {
groupId: "lead-uuid",
topics: ["lead-analytics"],
pollingInterval: 1,
autoCommit: false
};
final Client leadsDbClient = check new;
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
isolated service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, LeadAnalytics[] leadsData) returns error? {
LeadAnalyticsDataInsert[] insertData = from var lead in leadsData
select {id: uuid:createType1AsString(), ...lead};
_ = check leadsDbClient->/leadanalyticsdata.post(insertData);
}
}
Loading data to a data warehouse
Ballerina's integration and security capabilities simplify the loading of transformed data securely into on-prem or SaaS data warehouses.
final bigquery:Client bigQueryClient = check new ({auth: {token: bigQueryAccessToken}});
public function main() returns error? {
SocialMediaInteraction[] interactions = check io:fileReadCsv("./resources/interactions.csv");
bigquery:TabledatainsertallrequestRows[] rows = from var interaction in interactions
select {insertId: uuid:createType1AsString(), 'json: interaction};
bigquery:TableDataInsertAllRequest payload = {rows};
_ = check bigQueryClient->insertAllTableData(projectId, datasetId, tableId, payload);
}
Manual data inspection
Ballerina can connect with Google Sheets or Excel to load data samples, intermediate data, or processed data for manual inspection.
function loadToGoogleSheet(string sheetName, string workSheetName,
SalesSummary[] salesSummary) returns error? {
sheets:Spreadsheet spreadsheet = check spreadsheetClient->createSpreadsheet(sheetName);
string spreadSheetId = spreadsheet.spreadsheetId;
check spreadsheetClient->renameSheet(spreadSheetId, "Sheet1", workSheetName);
_ = check spreadsheetClient
->appendValue(spreadSheetId, ["Product", "Sales"], {sheetName: workSheetName});
foreach var {product, sales} in salesSummary {
_ = check spreadsheetClient
->appendValue(spreadSheetId, [product, sales], {sheetName: workSheetName});
}
}