Connecting to Salesforce using Pub / Sub API (gRPC)

pub / sub API is currently in pilot. Talk to your AE to have it enabled in your org. Below implementation may change once the features are GA.

Pub/Sub API has been the talk of the town for some and I got to play with it recently. There are some code example in the pub / sub API pilot documentation but that’s only available in Python. Being a Node.js enthusiast I wanted to establish the pub / sub api using node.js.

npm packages
jsforce -> To get the authentication details.
avro-js -> Converting the binary response from subscribe call to json & vice-versa
grpc -> Core GRPC library
@grpc/proto-loader -> This library is used to load the proto file.

// npm packages & initialization
const grpc = require("grpc");
const fs = require("fs");
const jsforce = require("jsforce");
const avro = require("avro-js");
const protoLoader = require("@grpc/proto-loader");
const packageDef = protoLoader.loadSync("sf.proto", {});
const grpcObj = grpc.loadPackageDefinition(packageDef);
const sfdcPackage = grpcObj.eventbus.v1;
const root_cert = fs.readFileSync("roots.cer");
view raw npmPackage.js hosted with ❤ by GitHub

Authenticating & connecting to Salesforce
As of now pub/sub api do not provide any method / rpc for authenticating the user. You will have to perform the login call using jsforce library or directly via rest api to get the details. The details that are required are
– accesstoken
– instanceurl
– orgid
The above details will be set as metadata headers and will be used to setup the initial connection with the grpc server.

const conn = new jsforce.Connection({
loginUrl: "https://test.salesforce.com",
});
const connectionResult = await conn.login(username, password);
const orgId = connectionResult.organizationId;
const metaCallback = (_params, callback) => {
const meta = new grpc.Metadata();
meta.add("accesstoken", conn.accessToken);
meta.add("instanceurl", conn.instanceUrl);
meta.add("tenantid", orgId);
callback(null, meta);
};
const callCreds = grpc.credentials.createFromMetadataGenerator(metaCallback);
const combCreds = grpc.credentials.combineChannelCredentials(
grpc.credentials.createSsl(root_cert),
callCreds
);
const client = new sfdcPackage.PubSub(
"api.pilot.pubsub.salesforce.com:7443",
combCreds
);
view raw sfgrpc.js hosted with ❤ by GitHub

Topic Info & Schema Info
Salesforce transmits the data in avro binary format. To read this format we would need the schema information to convert the binary to readable json format. The salesforce proto file provides 2 rpc for the same.
GetTopic -> Pass the topic name (Platform event or CDC name) to get the schemaId. This schemaId will be passed to the GetSchema rpc to get the actual schema.
GetSchema -> Pass the schema Id retrieved in the previous GetTopic call to get the schema which will be used for avro conversion.

const topicName = '/event/Your_Custom_event__e';
let schemaId = '';
let schema;
client.GetTopic({ topicName: topicName }, (err, response) => {
if(err) {
//throw error
}else { //get the schema information
schemaId = response.schemaId
client.GetSchema({ schemaId: schemaId }, (error, res) => {
if(error) {
//handle error
}else {
schema = avro.parse(res.schemaJson)
}
})
}
})

Subscribe
Now that we all the prerequisites , let do the actual thing, subscribing to a platform event. One thing to note here is that both the input & output of the Subscribe rpc are streams (BiDirectional Stream).

const subscription = client.Subscribe(); //client here is the grpc client.
//Since this is a stream, you can call the write method multiple times.
//Only the required data is being passed here, the topic name & the numReqested
//Once the system has received the events == to numReqested then the stream will end.
subscription.write({
topicName: "/event/Your_platform_event__e",
numRequested: 10,
});
//listen to new events.
subscription.on("data", function (data) {
console.log("data => ", data);
//data.events is an array of events. Below I am just parsing the first event.
//Please add the logic to handle mutiple events.
if (data.events) {
const payload = data.events[0].event.payload;
let jsonData = schema.fromBuffer(payload);//this schema is the same which we retreived earlier in the GetSchema rpc.
console.log("Event details ==> ", jsonData);
} else {
//if there are no events then every 270 seconds the system will keep publishing the latestReplayId.
}
});
subscription.on("end", function () {
console.log("ended");
});
subscription.on("error", function (err) {
console.log("error", JSON.stringify(err)); //handle errors
});
subscription.on("status", function (status) {
console.log("status ==> ", status);
});
view raw subscription.js hosted with ❤ by GitHub

Publishing an event
For publishing an event salesforce’s proto offers 2 rpc (bidirectional stream & unary). We will use the unary rpc to publish an event.

const dataToPublish = {
Some_Text_Field__c: { string: "Test" },
Another_Boolean_Field__c: { boolean: false },
CreatedDate: new Date().getTime(), //This is Required
CreatedById: "0057X000003ilJfQAI", //Id of the current user. This is required.
};
client.Publish(
{
topicName: "/event/Your_Custom_Event__e",
events: [
{
id: "124",// this can be any string
schemaId: schemaId, //This is the same schemaId that we retrieved earlier.
payload: schema.toBuffer(dataToPublish), //The schema here is the avro generated schema
},
],
},
function (err, response) {
if (err) {
console.log("error from publishing ", err);
} else {
console.log("Response ", response);
}
}
);

Hope it helps!

5 thoughts on “Connecting to Salesforce using Pub / Sub API (gRPC)”

  1. Connecting to Salesforce using Pub / Sub API (gRPC) : Do you have any repo for pub/sub api ?

    What should be sf.proto file look like ?

      1. Thanks Sameer, i was able to pass the error message for proto file. But i’m still getting “no such file roots.cer file” – Can you please post an example of roots.cer file. Thanks for helping community !!!

      2. Are there instructions on how to run this application – will be a great help for sure.

Leave a comment