Connecting to Salesforce using Pub / Sub API (gRPC)

pub / sub API is currently in pilot. Talk to your AE to have it enabled in your org. Below implementation may change once the features are GA.

Pub/Sub API has been the talk of the town for some and I got to play with it recently. There are some code example in the pub / sub API pilot documentation but that’s only available in Python. Being a Node.js enthusiast I wanted to establish the pub / sub api using node.js.

npm packages
jsforce -> To get the authentication details.
avro-js -> Converting the binary response from subscribe call to json & vice-versa
grpc -> Core GRPC library
@grpc/proto-loader -> This library is used to load the proto file.

// npm packages & initialization
const grpc = require("grpc");
const fs = require("fs");
const jsforce = require("jsforce");
const avro = require("avro-js");
const protoLoader = require("@grpc/proto-loader");
const packageDef = protoLoader.loadSync("sf.proto", {});
const grpcObj = grpc.loadPackageDefinition(packageDef);
const sfdcPackage = grpcObj.eventbus.v1;
const root_cert = fs.readFileSync("roots.cer");
view raw npmPackage.js hosted with ❤ by GitHub

Authenticating & connecting to Salesforce
As of now pub/sub api do not provide any method / rpc for authenticating the user. You will have to perform the login call using jsforce library or directly via rest api to get the details. The details that are required are
– accesstoken
– instanceurl
– orgid
The above details will be set as metadata headers and will be used to setup the initial connection with the grpc server.

const conn = new jsforce.Connection({
loginUrl: "https://test.salesforce.com",
});
const connectionResult = await conn.login(username, password);
const orgId = connectionResult.organizationId;
const metaCallback = (_params, callback) => {
const meta = new grpc.Metadata();
meta.add("accesstoken", conn.accessToken);
meta.add("instanceurl", conn.instanceUrl);
meta.add("tenantid", orgId);
callback(null, meta);
};
const callCreds = grpc.credentials.createFromMetadataGenerator(metaCallback);
const combCreds = grpc.credentials.combineChannelCredentials(
grpc.credentials.createSsl(root_cert),
callCreds
);
const client = new sfdcPackage.PubSub(
"api.pilot.pubsub.salesforce.com:7443",
combCreds
);
view raw sfgrpc.js hosted with ❤ by GitHub

Topic Info & Schema Info
Salesforce transmits the data in avro binary format. To read this format we would need the schema information to convert the binary to readable json format. The salesforce proto file provides 2 rpc for the same.
GetTopic -> Pass the topic name (Platform event or CDC name) to get the schemaId. This schemaId will be passed to the GetSchema rpc to get the actual schema.
GetSchema -> Pass the schema Id retrieved in the previous GetTopic call to get the schema which will be used for avro conversion.

const topicName = '/event/Your_Custom_event__e';
let schemaId = '';
let schema;
client.GetTopic({ topicName: topicName }, (err, response) => {
if(err) {
//throw error
}else { //get the schema information
schemaId = response.schemaId
client.GetSchema({ schemaId: schemaId }, (error, res) => {
if(error) {
//handle error
}else {
schema = avro.parse(res.schemaJson)
}
})
}
})

Subscribe
Now that we all the prerequisites , let do the actual thing, subscribing to a platform event. One thing to note here is that both the input & output of the Subscribe rpc are streams (BiDirectional Stream).

const subscription = client.Subscribe(); //client here is the grpc client.
//Since this is a stream, you can call the write method multiple times.
//Only the required data is being passed here, the topic name & the numReqested
//Once the system has received the events == to numReqested then the stream will end.
subscription.write({
topicName: "/event/Your_platform_event__e",
numRequested: 10,
});
//listen to new events.
subscription.on("data", function (data) {
console.log("data => ", data);
//data.events is an array of events. Below I am just parsing the first event.
//Please add the logic to handle mutiple events.
if (data.events) {
const payload = data.events[0].event.payload;
let jsonData = schema.fromBuffer(payload);//this schema is the same which we retreived earlier in the GetSchema rpc.
console.log("Event details ==> ", jsonData);
} else {
//if there are no events then every 270 seconds the system will keep publishing the latestReplayId.
}
});
subscription.on("end", function () {
console.log("ended");
});
subscription.on("error", function (err) {
console.log("error", JSON.stringify(err)); //handle errors
});
subscription.on("status", function (status) {
console.log("status ==> ", status);
});
view raw subscription.js hosted with ❤ by GitHub

Publishing an event
For publishing an event salesforce’s proto offers 2 rpc (bidirectional stream & unary). We will use the unary rpc to publish an event.

const dataToPublish = {
Some_Text_Field__c: { string: "Test" },
Another_Boolean_Field__c: { boolean: false },
CreatedDate: new Date().getTime(), //This is Required
CreatedById: "0057X000003ilJfQAI", //Id of the current user. This is required.
};
client.Publish(
{
topicName: "/event/Your_Custom_Event__e",
events: [
{
id: "124",// this can be any string
schemaId: schemaId, //This is the same schemaId that we retrieved earlier.
payload: schema.toBuffer(dataToPublish), //The schema here is the avro generated schema
},
],
},
function (err, response) {
if (err) {
console.log("error from publishing ", err);
} else {
console.log("Response ", response);
}
}
);

Hope it helps!

Salesforce File Migration (ContentVersion) via Node.js

As part of a project I was assigned the task to move content version from one org (legacy) to another. For this purpose I used node.js as my go-to language to achieve this. First task was to download the file from legacy org and then subsequently upload the same to the active org without storing the file in the local file system.

One important thing to note here is to insert the files in content version there are 2 different approaches
– Use the JSON structure to insert a record in the content version for content lesser than 37Mb.
– Use the multipart method to insert the blob if the size of the content is greater than 37Mb.

In the below code you will see that I am fetching the latest content version file and inserting it back in the same org. But you can obviously connect with another org and push the file into that org.

const jsforce = require("jsforce");
const axios = require("axios");
var FormData = require("form-data");
const getStream = require("get-stream");
const mime = require("mime-types");
const conn = new jsforce.Connection({
loginUrl: "https://login.salesforce.com",
});
const username = "";
const password = "";
const main = async () => {
await conn.login(username, password);
const sourceContentVersionFile =
await conn.query(`SELECT Id, Title, ContentSize, VersionData, PathOnClient
FROM ContentVersion
ORDER BY CreatedDate DESC
LIMIT 1`);
const contentVersionRecord = sourceContentVersionFile.records[0];
if (contentVersionRecord.ContentSize > 37000000) {
// size greater than 37Mb use multipart blob insert.
const fileStream = await getFile(contentVersionRecord, false);
const formData = createFormData(contentVersionRecord, fileStream);
const URL =
conn.instanceUrl + "/services/data/v51.0/sobjects/ContentVersion";
await axios({
method: "post",
maxContentLength: Infinity,
maxBodyLength: Infinity,
url: URL,
headers: {
Authorization: "Bearer " + conn.accessToken,
"Content-Type": `multipart/form-data; boundary=\"boundary_string\"`,
},
data: formData,
});
} else {
const base64Body = await getFile(contentVersionRecord, true);
await conn.sobject("ContentVersion").insert({
Title: contentVersionRecord.Title,
PathOnClient: contentVersionRecord.PathOnClient,
VersionData: base64Body,
FirstPublishLocationId: "0012w00000rTbXNAA0", //Id to which the content version needs to be linked
Origin: "H",
});
}
};
const getFile = async (data, generateBase64String) => {
const file = await axios({
method: "get",
url: conn.instanceUrl + data.VersionData,
headers: {
Authorization: "Bearer " + conn.accessToken,
},
responseType: "stream",
});
if (generateBase64String) {
return await getStream(file.data, { encoding: "base64" });
} else {
return file.data; // return the stream;
}
};
const createFormData = (data, file) => {
const contentVersion = {
FirstPublishLocationId: "0012w00000rTbXNAA0",
Title: data.Title,
PathOnClient: data.PathOnClient,
Origin: "H",
};
const form = new FormData();
form.setBoundary("boundary_string");
form.append("entity_content", JSON.stringify(contentVersion), {
contentType: "application/json",
});
form.append("VersionData", file, {
filename: data.PathOnClient,
contentType: mime.lookup(data.PathOnClient),
});
return form;
};
main();
view raw main.js hosted with ❤ by GitHub

Couple of things:
– The above code shows the migration of a single file, however the same logic can be used to migrate any number of files.
– I am pulling and pushing the file in the same org. You can however connect to multiple salesforce Orgs and move the file around.

Hope it helps!