Ballerina for ETL
Extracting, transforming, and loading data often involves dealing with intricate, time-consuming processes, leading to high costs and operational inefficiencies.
Ballerina simplifies ETL with its programmable approach. It excels in complex data operations, integrations, and deployment flexibility, ensuring a scalable and adaptable ETL experience.
![AI](/images/etl-sm-banner.png)
Data extraction (E)
Drive data interactions with models
Ballerina's persist
feature seamlessly connects to databases, retrieves data, and kickstarts your ETL process.
function getOrderData() returns error? {
stream<OrderData, persist:Error?> orders = dbClient->/orderdata();
check from var orderData in orders
do {
io:println(orderData);
};
}
function getCustomerData() returns error? {
stream<CustomerData, persist:Error?> customers = dbClient->/customers();
check from var customerData in customers
do {
io:println(customerData);
};
}
function getOrderWithCustomer(string orderId) returns error? {
OrderWithCustomer orderwithCustomer = check dbClient->/orderdata/[orderId];
io:println(orderwithCustomer);
}
Make data extraction smarter
Ballerina AI integrations can extract information from unstructured data, unlocking vast amounts of information captured in emails, documents, comments, etc. in the extraction process.
final chat:Client openAiChat = check new ({auth: {token: openAIKey}});
isolated service /api/reviews on new http:Listener(8080) {
resource function post summary(SummaryRequest summaryRequest) returns error? {
chat:CreateChatCompletionRequest request = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: string `
Extract the following details in JSON from the reviews given.
{
good_points: string,
bad_points: string,
improvement_points: string
}
The fields should contain points extracted from all reviews
Here are the reviews:
${string:'join(",", ...summaryRequest.reviews)}
`
}
]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
string content = check summary.choices[0].message?.content.ensureType();
io:println(content);
}
}
}
Unleash data from APIs
Ballerina offers a wide range of built-in connectors, including HTTP and GraphQL clients, making it easy to interact with APIs for efficient data extraction in your ETL pipeline.
configurable string githubAccessToken = ?;
configurable string repoOwner = "ballerina-platform";
configurable string repoName = "ballerina-lang";
final github:Client githubClient = check new ({
auth: {
token: githubAccessToken
}
});
public function main() returns error? {
github:IssueFilters filters = {
labels: ["Type/NewFeature", "Priority/High"],
states: [github:ISSUE_OPEN]
};
stream<github:Issue, github:Error?> openFeatures
= check githubClient->getIssues(repoOwner, repoName, filters);
check from var feature in openFeatures
do {
io:println(feature.title);
};
}
Data transformation (T)
Eliminating redundancy for cleaner data
Ballerina simplifies duplicate removal through Ballerina queries and other intuitive constructs.
type SalesOrder record {|
string itemId;
string customerId;
string itemName;
int quantity;
int date;
|};
function removeDuplicates(SalesOrder[] orders) returns SalesOrder[] {
return from var {itemId, customerId, itemName, quantity, date} in orders
group by itemId, customerId, itemName
select {
itemId,
customerId,
itemName,
quantity: [quantity][0],
date: [date][0]
};
}
public type Customer record {|
string name;
string city;
string phone;
|};
function findDuplicates(Customer[] customers) returns [Customer[], Customer[]] {
Customer[] duplicates = [];
Customer[] uniqueCustomers = [customers[0]];
foreach int customerIdx in 1 ... customers.length() - 1 {
foreach Customer uniqueCustomer in uniqueCustomers {
Customer customer = customers[customerIdx];
if getEditDistance(customer.name, uniqueCustomer.name) < 3 &&
getEditDistance(customer.city, uniqueCustomer.city) < 2 &&
getEditDistance(customer.phone, uniqueCustomer.phone) < 4 {
duplicates.push(customer);
} else {
uniqueCustomers.push(customer);
}
}
}
return [uniqueCustomers, duplicates];
}
Refining data excellence
Ballerina's seamless integration with AI models enhances error correction, ensuring data integrity and precision during transformation.
configurable string openAIToken = ?;
final http:Client openAIClient = check new ("https://api.openai.com", {auth: {token: openAIToken}});
public function main(string filePath) returns error? {
string inputString = check io:fileReadString(filePath);
OpenAiChatResponse response = check openAIClient->/v1/chat/completions.post(
{
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: "Fix grammar and spelling mistakes."
},
{
role: "user",
content: inputString
}
]
}
);
io:println(string `Corrected: ${response.choices[0].message.content}`);
}
Data enrichment magic
Ballerina streamlines data enrichment, simplifying the journey from data extraction from databases or APIs to adding depth and context to your insights.
final http:Client geocodingClient = check new ("https://maps.googleapis.com");
isolated service /api/v1 on new http:Listener(8080) {
resource function post customerWithGeoCode(Customer customer) returns GeoTaggedCustomer|error {
GeocodeResponse response = check geocodingClient->
/maps/api/geocode/'json(address = customer.address, key = geocodingAPIKey);
if response.status == "OK" {
return {
...customer,
latitude: response.results[0].geometry.location.lat,
longitude: response.results[0].geometry.location.lng
};
}
return error("Error while getting the location information.");
}
}
Reshaping data with data mapper
Comprehensive data mapping support facilitates the transformation of source data into any schema expected by target systems.
![Reshaping data with data mapper](/images/usecases/etl-data-mapping.gif)
Precision cleaning with regular expressions
Ballerina's regex support empowers data cleansing and standardization, enhancing data quality and consistency.
function standardizeValues(string inputString) returns int {
string:RegExp yesPattern = re `y|Y|((y|Y)(e|E)(s|S))`;
if yesPattern.isFullMatch(inputString) {
return 1;
}
string:RegExp noPattern = re `n|N|((n|N)(o|O))`;
if noPattern.isFullMatch(inputString) {
return -1;
}
return 0;
}
function isValidEmail(string inputString) returns boolean {
string:RegExp emailPattern = re `[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,}`;
return emailPattern.isFullMatch(inputString);
}
function removeExtraWhiteSpaces(string inputString) returns string {
string:RegExp extraSpaces = re `\s+`;
return extraSpaces.replaceAll(inputString, " ");
}
Data loading (L)
Real-time data flow
Ballerina's support for various data streaming sources, such as Kafka, RabbitMQ, and NATS, facilitates the creation of real-time streaming ETL pipelines.
final kafka:ConsumerConfiguration consumerConfiguration = {
groupId: "lead-uuid",
topics: ["lead-analytics"],
pollingInterval: 1,
autoCommit: false
};
final Client leadsDbClient = check new;
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
isolated service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, LeadAnalytics[] leadsData) returns error? {
LeadAnalyticsDataInsert[] insertData = from var lead in leadsData
select {id: uuid:createType1AsString(), ...lead};
_ = check leadsDbClient->/leadanalyticsdata.post(insertData);
}
}
Loading data to a data warehouse
Ballerina's integration and security capabilities simplify the loading of transformed data securely into on-prem or SaaS data warehouses.
final bigquery:Client bigQueryClient = check new ({auth: {token: bigQueryAccessToken}});
public function main() returns error? {
SocialMediaInteraction[] interactions = check io:fileReadCsv("./resources/interactions.csv");
bigquery:TabledatainsertallrequestRows[] rows = from var interaction in interactions
select {insertId: uuid:createType1AsString(), 'json: interaction};
bigquery:TableDataInsertAllRequest payload = {rows};
_ = check bigQueryClient->insertAllTableData(projectId, datasetId, tableId, payload);
}
Manual data inspection
Ballerina can connect with Google Sheets or Excel to load data samples, intermediate data, or processed data for manual inspection.
function loadToGoogleSheet(string sheetName, string workSheetName,
SalesSummary[] salesSummary) returns error? {
sheets:Spreadsheet spreadsheet = check spreadsheetClient->createSpreadsheet(sheetName);
string spreadSheetId = spreadsheet.spreadsheetId;
check spreadsheetClient->renameSheet(spreadSheetId, "Sheet1", workSheetName);
_ = check spreadsheetClient
->appendValue(spreadSheetId, ["Product", "Sales"], {sheetName: workSheetName});
foreach var {product, sales} in salesSummary {
_ = check spreadsheetClient
->appendValue(spreadSheetId, [product, sales], {sheetName: workSheetName});
}
}