Back to pre-built integrations

In enterprises, business data like inventory levels, order statuses, or product prices change constantly, and multiple entities require access to these updates. Utilizing messaging systems like Kafka enables the real-time consumption of this information by multiple parties. Keeping Salesforce updated with frequently changing information such as current pricing is vital for the sales staff. Ballerina's connectors and streaming capabilities facilitate this integration, allowing seamless connections between Salesforce and streaming data sources. It ensures that transformed and filtered data is constantly pushed to Salesforce, keeping all relevant information current and accessible.

The example below demonstrates how to update Salesforce price books in real-time based on the prices published to a Kafka topic.

Sequence Diagram
Copy
import ballerinax/kafka;
import ballerinax/salesforce as sf;

configurable string salesforceAccessToken = ?;
configurable string salesforceBaseUrl = ?;
configurable string salesforcePriceBookId = ?;

public type ProductPrice readonly & record {|
    string name;
    float unitPrice;
|};

public type ProductPriceUpdate readonly & record {|
    float UnitPrice;
|};

listener kafka:Listener orderListener = new (kafka:DEFAULT_URL, {
    groupId: "order-group-id",
    topics: "product_price_updates"
});

final sf:Client sfdcClient = check new ({
    baseUrl: salesforceBaseUrl,
    auth: {
        token: salesforceAccessToken
    }
});

service on orderListener {
    isolated remote function onConsumerRecord(ProductPrice[] prices) returns error? {
        foreach ProductPrice {name, unitPrice} in prices {
            stream<record {}, error?> retrievedStream = check sfdcClient->query(
                string `SELECT Id FROM PricebookEntry 
                    WHERE Pricebook2Id = '${salesforcePriceBookId}' AND 
                    Name = '${name}'`);
            record {}[] retrieved = check from record {} entry in retrievedStream
                select entry;
            anydata pricebookEntryId = retrieved[0]["Id"];
            if pricebookEntryId is string {
                ProductPriceUpdate updatedPrice = {UnitPrice: unitPrice};
                check sfdcClient->update("PricebookEntry", pricebookEntryId, updatedPrice);
            }
        }
    }
}