Connecting to Salesforce using Pub / Sub API (gRPC)

pub / sub API is currently in pilot. Talk to your AE to have it enabled in your org. Below implementation may change once the features are GA.

Pub/Sub API has been the talk of the town for some and I got to play with it recently. There are some code example in the pub / sub API pilot documentation but that’s only available in Python. Being a Node.js enthusiast I wanted to establish the pub / sub api using node.js.

npm packages
jsforce -> To get the authentication details.
avro-js -> Converting the binary response from subscribe call to json & vice-versa
grpc -> Core GRPC library
@grpc/proto-loader -> This library is used to load the proto file.

// npm packages & initialization
const grpc = require("grpc");
const fs = require("fs");
const jsforce = require("jsforce");
const avro = require("avro-js");
const protoLoader = require("@grpc/proto-loader");
const packageDef = protoLoader.loadSync("sf.proto", {});
const grpcObj = grpc.loadPackageDefinition(packageDef);
const sfdcPackage = grpcObj.eventbus.v1;
const root_cert = fs.readFileSync("roots.cer");
view raw npmPackage.js hosted with ❤ by GitHub

Authenticating & connecting to Salesforce
As of now pub/sub api do not provide any method / rpc for authenticating the user. You will have to perform the login call using jsforce library or directly via rest api to get the details. The details that are required are
– accesstoken
– instanceurl
– orgid
The above details will be set as metadata headers and will be used to setup the initial connection with the grpc server.

const conn = new jsforce.Connection({
loginUrl: "https://test.salesforce.com",
});
const connectionResult = await conn.login(username, password);
const orgId = connectionResult.organizationId;
const metaCallback = (_params, callback) => {
const meta = new grpc.Metadata();
meta.add("accesstoken", conn.accessToken);
meta.add("instanceurl", conn.instanceUrl);
meta.add("tenantid", orgId);
callback(null, meta);
};
const callCreds = grpc.credentials.createFromMetadataGenerator(metaCallback);
const combCreds = grpc.credentials.combineChannelCredentials(
grpc.credentials.createSsl(root_cert),
callCreds
);
const client = new sfdcPackage.PubSub(
"api.pilot.pubsub.salesforce.com:7443",
combCreds
);
view raw sfgrpc.js hosted with ❤ by GitHub

Topic Info & Schema Info
Salesforce transmits the data in avro binary format. To read this format we would need the schema information to convert the binary to readable json format. The salesforce proto file provides 2 rpc for the same.
GetTopic -> Pass the topic name (Platform event or CDC name) to get the schemaId. This schemaId will be passed to the GetSchema rpc to get the actual schema.
GetSchema -> Pass the schema Id retrieved in the previous GetTopic call to get the schema which will be used for avro conversion.

const topicName = '/event/Your_Custom_event__e';
let schemaId = '';
let schema;
client.GetTopic({ topicName: topicName }, (err, response) => {
if(err) {
//throw error
}else { //get the schema information
schemaId = response.schemaId
client.GetSchema({ schemaId: schemaId }, (error, res) => {
if(error) {
//handle error
}else {
schema = avro.parse(res.schemaJson)
}
})
}
})

Subscribe
Now that we all the prerequisites , let do the actual thing, subscribing to a platform event. One thing to note here is that both the input & output of the Subscribe rpc are streams (BiDirectional Stream).

const subscription = client.Subscribe(); //client here is the grpc client.
//Since this is a stream, you can call the write method multiple times.
//Only the required data is being passed here, the topic name & the numReqested
//Once the system has received the events == to numReqested then the stream will end.
subscription.write({
topicName: "/event/Your_platform_event__e",
numRequested: 10,
});
//listen to new events.
subscription.on("data", function (data) {
console.log("data => ", data);
//data.events is an array of events. Below I am just parsing the first event.
//Please add the logic to handle mutiple events.
if (data.events) {
const payload = data.events[0].event.payload;
let jsonData = schema.fromBuffer(payload);//this schema is the same which we retreived earlier in the GetSchema rpc.
console.log("Event details ==> ", jsonData);
} else {
//if there are no events then every 270 seconds the system will keep publishing the latestReplayId.
}
});
subscription.on("end", function () {
console.log("ended");
});
subscription.on("error", function (err) {
console.log("error", JSON.stringify(err)); //handle errors
});
subscription.on("status", function (status) {
console.log("status ==> ", status);
});
view raw subscription.js hosted with ❤ by GitHub

Publishing an event
For publishing an event salesforce’s proto offers 2 rpc (bidirectional stream & unary). We will use the unary rpc to publish an event.

const dataToPublish = {
Some_Text_Field__c: { string: "Test" },
Another_Boolean_Field__c: { boolean: false },
CreatedDate: new Date().getTime(), //This is Required
CreatedById: "0057X000003ilJfQAI", //Id of the current user. This is required.
};
client.Publish(
{
topicName: "/event/Your_Custom_Event__e",
events: [
{
id: "124",// this can be any string
schemaId: schemaId, //This is the same schemaId that we retrieved earlier.
payload: schema.toBuffer(dataToPublish), //The schema here is the avro generated schema
},
],
},
function (err, response) {
if (err) {
console.log("error from publishing ", err);
} else {
console.log("Response ", response);
}
}
);

Hope it helps!

Salesforce File Migration (ContentVersion) via Node.js

As part of a project I was assigned the task to move content version from one org (legacy) to another. For this purpose I used node.js as my go-to language to achieve this. First task was to download the file from legacy org and then subsequently upload the same to the active org without storing the file in the local file system.

One important thing to note here is to insert the files in content version there are 2 different approaches
– Use the JSON structure to insert a record in the content version for content lesser than 37Mb.
– Use the multipart method to insert the blob if the size of the content is greater than 37Mb.

In the below code you will see that I am fetching the latest content version file and inserting it back in the same org. But you can obviously connect with another org and push the file into that org.

const jsforce = require("jsforce");
const axios = require("axios");
var FormData = require("form-data");
const getStream = require("get-stream");
const mime = require("mime-types");
const conn = new jsforce.Connection({
loginUrl: "https://login.salesforce.com",
});
const username = "";
const password = "";
const main = async () => {
await conn.login(username, password);
const sourceContentVersionFile =
await conn.query(`SELECT Id, Title, ContentSize, VersionData, PathOnClient
FROM ContentVersion
ORDER BY CreatedDate DESC
LIMIT 1`);
const contentVersionRecord = sourceContentVersionFile.records[0];
if (contentVersionRecord.ContentSize > 37000000) {
// size greater than 37Mb use multipart blob insert.
const fileStream = await getFile(contentVersionRecord, false);
const formData = createFormData(contentVersionRecord, fileStream);
const URL =
conn.instanceUrl + "/services/data/v51.0/sobjects/ContentVersion";
await axios({
method: "post",
maxContentLength: Infinity,
maxBodyLength: Infinity,
url: URL,
headers: {
Authorization: "Bearer " + conn.accessToken,
"Content-Type": `multipart/form-data; boundary=\"boundary_string\"`,
},
data: formData,
});
} else {
const base64Body = await getFile(contentVersionRecord, true);
await conn.sobject("ContentVersion").insert({
Title: contentVersionRecord.Title,
PathOnClient: contentVersionRecord.PathOnClient,
VersionData: base64Body,
FirstPublishLocationId: "0012w00000rTbXNAA0", //Id to which the content version needs to be linked
Origin: "H",
});
}
};
const getFile = async (data, generateBase64String) => {
const file = await axios({
method: "get",
url: conn.instanceUrl + data.VersionData,
headers: {
Authorization: "Bearer " + conn.accessToken,
},
responseType: "stream",
});
if (generateBase64String) {
return await getStream(file.data, { encoding: "base64" });
} else {
return file.data; // return the stream;
}
};
const createFormData = (data, file) => {
const contentVersion = {
FirstPublishLocationId: "0012w00000rTbXNAA0",
Title: data.Title,
PathOnClient: data.PathOnClient,
Origin: "H",
};
const form = new FormData();
form.setBoundary("boundary_string");
form.append("entity_content", JSON.stringify(contentVersion), {
contentType: "application/json",
});
form.append("VersionData", file, {
filename: data.PathOnClient,
contentType: mime.lookup(data.PathOnClient),
});
return form;
};
main();
view raw main.js hosted with ❤ by GitHub

Couple of things:
– The above code shows the migration of a single file, however the same logic can be used to migrate any number of files.
– I am pulling and pushing the file in the same org. You can however connect to multiple salesforce Orgs and move the file around.

Hope it helps!

node.js + ALM + REST API = Awesome

Everyday you see people (or yourself) doing some mundane task and you wonder how can you help them automate that process. So this article highlights something I did a few months back to help a friend who used to spend considerable amount of time fetching data from ALM (Earlier know as QC) formatting it and putting in some other database.

I always wanted to use node.js in something useful, so I decided to set up a server to get data from ALM server and push it to other database. Until this point I had no idea if ALM had exposed any API’s. Luckily they had opened up access to ALM 11.00 and higher version via their REST API. I wanted everything on server side and there was no UI involved.

So I was able to come up with a small script which authenticates the user into ALM and then get all the defect related to a release. I have provided the comments where ever necessary in the below code. I am still a noob in node.js so please excuse me if I have made any mistake


var https = require('https'),
	fs = require('fs'),
	config = JSON.parse(fs.readFileSync('config.json'));//this refers to a file where I have all my config like host, userName, password Etc

//this is added to avoid the TLS error. Uncomment if you get a TLS error while authenticating.
//process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0';

//set the correct options for the call.
var options = {
	host : config.host, 
	path : "/qcbin/authentication-point/authenticate",
	method: "GET",
	headers : {'Authorization': 'Basic '+new Buffer(config.alm_userName + ':' + config.alm_password).toString('base64')}
	};
	//authenticating the user into ALM
	ALMConnect(options, 'header','', function(status, data){
		if(status){
			//get the LWSSO_Cookie from the header. This is the session cookie which will be used in all callouts to ALM.
			if(data.headers["set-cookie"] != undefined ) {
				extractDefects(data.headers["set-cookie"]);
			}else{
				console.log('Dagnabbit!! ERROR:  Unable to login, check your username/password/serverURL.');
			}
		}else{
			console.log('Dagnabbit!! ERROR:  ' + JSON.stringify(data));
		}
	});

//Function to extract the defects for analysis.
function extractDefects(LWSSO_Cookie){
	var queryParam = "{";
	//add Release
	queryParam += "detected-in-rel["+config.release+"];";
	//add all your request parameters here. Its a little complicated initially, but you will get a hang of it. 
	// Make sure to use encodeURIComponents() for all the values in the query parameters.
	queryParam+="}";
	//get all the fields that you want to query. Lesser the fields smaller the XML returned, faster is the call.
	var fields = config.defectFieldMapping.fieldArray.join(',');
	var opt = {
		host: config.host,
		path: "/qcbin/rest/domains/"+config.domain+"/projects/"+config.project+"/defects?query="+queryParam+"&fields="+fields+"&page-size=max",
		method:"GET",
		headers: {"Cookie":LWSSO_Cookie}
	};

	ALMConnect(opt, 'data','',function(status,data){
		if(status){
                        //write the defects to an XML file in local drive.
			fs.writeFileSync('newDefect.xml',data);
			//once you get the defectXML you can parse it into JSON and push it other databases like SFDC etc..		
		}else{
			console.log('Dagnabbit!! ERROR:  ' + JSON.stringify(data));
		}
	});
}

function ALMConnect(opt, responseType,requestBody, callback){

	var request = https.request(opt, function(res){
		res.setEncoding('utf8');
		var XMLoutput='';
		res.on('data',function(chunk){
			XMLoutput+=chunk;
		});
		res.on('end',function(){
			if(responseType=='data'){
				callback(true,XMLoutput);
			}else {
				callback(true, res);
			}
		});
	});
	request.on('error',function(e){
		callback(false,e);
	});
	if(opt.method=='POST' || opt.method == 'PUT'){
		request.write(requestBody);
	}
	request.end();
}