Skip to content
Snippets Groups Projects
Commit 3e29b2e2 authored by Alf Eaton's avatar Alf Eaton
Browse files

Update the fragment source on the server after conversion

* Subscribe to the Pusher/Slanger "process chain execution" channel
* When the INK process completes, if a "targetFragment" query parameter is set, save the fragment source to the database.
parent 2ba015e3
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,10 @@ const Busboy = require('busboy')
const config = require('config')
const rp = require('request-promise-native')
const temp = require('temp')
const Pusher = require('pusher-js')
const STATUS = require('http-status-codes')
const sse = require('pubsweet-sse')
const AuthorizationError = require('pubsweet-server/src/errors/AuthorizationError')
// rp.debug = true
......@@ -13,6 +17,11 @@ const inkConfig = config.get('pubsweet-component-ink-backend')
// Generate the absolute URL
const inkUrl = path => inkConfig.inkEndpoint + 'api/' + path
// Connect to the INK Pusher/Slanger endpoint
const connectToPusher = ({ appKey, ...options }) => new Pusher(appKey, options)
const pusher = connectToPusher(inkConfig.pusher)
// Sign in
const authorize = () => rp({
method: 'POST',
......@@ -45,59 +54,20 @@ const upload = (recipeId, inputFile, auth) => rp({
timeout: 60 * 60 * 1000 // 3600 seconds
})
// Download the output file (keep trying if there's a 404 response, until it's ready)
const download = async (chain, auth, outputFileName) => {
const manifest = chain.input_file_manifest
if (manifest.length === 0) {
throw new Error('The INK server gave a malformed response (no input files in the process chain)')
}
const interval = inkConfig.interval || 1000 // try once per second
const maxRetries = inkConfig.maxRetries || 300 // retry for up to 5 minutes
const uri = inkUrl('process_chains/' + chain.id + '/download_output_file')
const qs = {
relative_path: outputFileName || path.basename(manifest[0].path, '.docx') + '.html'
}
const headers = {
// Download the output file
const download = async (chainId, auth, outputFileName) => rp({
uri: inkUrl('process_chains/' + chainId + '/download_output_file'),
qs: {
relative_path: outputFileName
},
headers: {
uid: inkConfig.email,
...auth
}
})
for (let i = 0; i < maxRetries; i++) {
// delay
await new Promise(resolve => setTimeout(resolve, interval))
const response = await rp({
uri,
qs,
headers,
simple: false,
resolveWithFullResponse: true
}).catch(error => {
logger.error('Error downloading from INK:', error.message)
throw error
})
// a successful request: return the data
if (response.statusCode === 200) {
return response.body
}
// not a 404 response - stop trying
if (response.statusCode !== 404) {
break
}
}
throw new Error('Unable to download the output from INK')
}
const findRecipeId = (recipeKey = 'Editoria Typescript', auth) => rp({
// Find the ID of a recipe by name
const findRecipeId = (name = 'Editoria Typescript', auth) => rp({
method: 'GET',
uri: inkUrl('recipes'),
headers: {
......@@ -106,7 +76,7 @@ const findRecipeId = (recipeKey = 'Editoria Typescript', auth) => rp({
},
json: true
}).then(data => {
const recipe = data.recipes.find(recipe => recipe.name === recipeKey)
const recipe = data.recipes.find(recipe => recipe.name === name)
return recipe ? recipe.id : null
})
......@@ -126,12 +96,65 @@ const process = async (inputFile, options) => {
throw err
})
return download(response.process_chain, auth, options.outputFileName)
const chain = response.process_chain
return new Promise((resolve, reject) => {
// subscribe to the "process chain execution" channel
// TODO: one private channel per process chain
// https://pusher.com/docs/authenticating_users
const channel = pusher.subscribe('process_chain_execution')
// wait for a "subscription succeeded" event
channel.bind('pusher:subscription_succeeded', () => {
const handler = data => {
if (data.chain_id !== chain.id) return
logger.info('Processing completed', data)
// unbind the event handler
channel.unbind('processing_completed', handler)
// unsubscribe from the channel
pusher.unsubscribe('process_chain_execution')
const manifest = chain.input_file_manifest
if (manifest.length === 0) {
reject(new Error('The INK server gave a malformed response (no input files in the process chain)'))
}
// backwards compatibility
if (!options.outputFileName) {
options.outputFileName = path.basename(manifest[0].path, '.docx') + '.html'
}
// download the output file
logger.info(`Downloading output file ${options.outputFileName} from chain ${chain.id}`)
download(chain.id, auth, options.outputFileName).then(result => {
resolve(result)
}).catch(error => {
logger.error('Error downloading from INK:', error.message)
reject(error)
})
}
// handle "processing completed" events on this channel
channel.bind('processing_completed', handler)
})
channel.bind('pusher:subscription_error', status => {
logger.error('Pusher subscription error', status)
reject(new Error('There was an error subscribing to the INK channel'))
})
})
}
const InkBackend = function (app) {
// TODO: authentication on this route
app.use('/api/ink', (req, res, next) => {
// TODO: set app.locals.passport
const authBearer = app.locals.passport.authenticate('bearer', { session: false })
app.use('/api/ink', authBearer, (req, res, next) => {
const fileStream = new Busboy({ headers: req.headers })
fileStream.on('file', (fieldname, file, filename, encoding, contentType) => {
......@@ -143,8 +166,41 @@ const InkBackend = function (app) {
options: { filename, contentType }
}
process(inputFile, req.query).then(converted => {
res.json({ converted })
logger.info(`Uploading file ${inputFile.name} to INK for processing`)
process(inputFile, req.query).then(async converted => {
if (req.query.targetFragment) {
const id = req.query.targetFragment
const fragment = await app.models.Fragment.find(id)
// TODO: set app.locals.authsome?
// const permission = await app.locals.authsome.can(req.user, 'PATCH', fragment)
const permission = true
if (!permission) {
throw new AuthorizationError(`User ${req.user} is not allowed to update the fragment source`)
}
let properties = { source: converted }
if (permission.filter) {
properties = permission.filter(properties)
}
fragment.updateProperties(properties)
await fragment.save()
sse.send({
action: 'fragment:patch',
data: {
fragment: { id },
properties
}
})
} else {
res.json({ converted })
}
// clean up temp file
fs.unlink(stream.path, () => {
......@@ -169,6 +225,12 @@ const InkBackend = function (app) {
})
req.pipe(fileStream)
// if the output should be saved asynchronously on the server,
// send an empty "accepted" response
if (req.query.targetFragment) {
res.status(STATUS.ACCEPTED).end()
}
})
}
......
......@@ -12,6 +12,9 @@
"@pubsweet/logger": "^0.0.1",
"busboy": "^0.2.13",
"config": "^1.26.1",
"http-status-codes": "^1.3.0",
"pubsweet-sse": "^0.1.3",
"pusher-js": "^4.2.1",
"request": "^2.83.0",
"request-promise-native": "^1.0.5",
"temp": "^0.8.3"
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment