SSE proposal with draft implementation
Requirements:
- Any critical operations regarding fragments should be populated in the frontend in 'near real time'. As critical operations are considered the create, update and delete actions.
- These notifications to the frontend should be pushed to the appropriate clients based on specific rules e.g. two different clients who work on the same book (collection) should receive live updates based on their interaction on this book's fragments but should not receive notifications for changes originated from other users who work on a different book. To this end, the notifications of the backend should contain specifiers in order to facilitate the frontend application needs to perform a re-fetch operation. An example of a specifier could be the id of the collection which contains the changed fragment. The latter specifier, or any other specifiers, could assist the frontend application to act accordingly based on client's interests (e.g. all the active clients of a specific collection should perform a re-fetch).
Proposal:
For the live updates functionality there is a need for an implementation both in frontend and in backend in order to establish an extra channel of communication between the clients and the server. In the domain of web development there are already many different solutions which tackle the specific functionality used by the community such as websockets (polling, long-polling), Server Sent Events, etc. Our suggestion is in favour of Server Sent Events.
SSE:
Pros:
- Fairly simple implementation required both in backend as well as in frontend
- Uses HTTP as the medium of communication between the server and the client
- EventSource is included in the Web API
- If the client loses the connection to the server the reconnection is handled automatically
- JSON is supported
- Lost events recovery is supported based on last event's id
- CORS support
Const:
- Not a full-duplex communication (the client only listens for server's notifications)
- IE and Edge browsers are not supported (easy workarround is provided via the use of polyfills https://github.com/remy/polyfills/blob/master/EventSource.js, https://github.com/Yaffle/EventSource)
Our proposed architecture:
A module should be created which will provide the following functionality:
- registration of the clients and establishment of ongoing communication with the server
- terminate the ongoing communication and deletion of a specific client when the latter closes hers/his browser
- serialization / deserialization mechanism for the incoming/outgoing event messages (SSE msg follow specific conventions e.g. \n\n)
- middleware which should be configurable and be used at the top level of the Node js application in order to leave intact the existing implementation
- middleware which should handle which of the existing resource endpoints should be tracked in order for a notification event to be triggered
- mechanism of tracking the status of the open connection between the clients and the server
You can find the entire implementation on https://gitlab.coko.foundation/alexgeo/sse_live_updates/tree/master
Code snippets
In the existing api.js (pubsweet-backend)
const { ServerSentEvents } = require('../../live_update_module')
const sse = ServerSentEvents()
const sseOptions = {
allowedVerbs: ['POST', 'PUT', 'DELETE'],
includeResources: ['collections'],
excludeResources: ['teams']
}
// Declaration of endpoint for the establisment of the communication between clients/server
api.get('/subscribe', (req, res) => {
sse.addClient(req, res)
console.log('active connections', sse.activeConnectionsCounter())
})
// Middleware for the tracking of resource endpoints
api.use(sse.eventPusher(sseOptions))
Draft SSE implementation
function addClient(req, res) {
initializeConnection(req, res)
activeConnections.push(res)
}
function initializeConnection(req, res) {
console.log('init')
req.socket.setTimeout(0);
req.socket.setNoDelay(true);
req.socket.setKeepAlive(true);
res.set('Content-Type', 'text/event-stream')
res.set('Cache-Control', 'no-cache')
res.set('Connection', 'keep-alive')
res.write('\n')
}
// Actual draft implementation of the middleware
function eventPusher(options) {
return function (req, res, next) {
let allowedVerbs = false
let includeResources = false
let excludeResources = false
options.allowedVerbs.forEach(function(verb) {
if (req.method === verb) {
allowedVerbs = true
}
}, this);
options.includeResources.forEach(function(incResource) {
if (req.url.includes(incResource)) {
includeResources = true
}
}, this);
options.excludeResources.forEach(function(excResource) {
if (!req.url.includes(excResource)) {
excludeResources = true
}
}, this);
if (allowedVerbs && includeResources && excludeResources) {
res.on('finish', function () {
if (res.statusCode == 201 || res.statusCode == 200) {
myEmitter.emit('trigger')
}
});
}
return next()
}
}