Commit ef496e13 authored by Jure's avatar Jure

feat(job-xsweet): use pgboss.job table to communicate result

parent 26f2b317
Pipeline #13054 passed with stages
in 12 minutes and 1 second
......@@ -9,10 +9,13 @@
"author": "Adam Hyde",
"license": "MIT",
"dependencies": {
"@pubsweet/db-manager": "^3.0.15",
"@pubsweet/logger": "^0.2.29",
"express-fileupload": "v1.1.1-alpha.2",
"passport": "^0.4.0",
"pubsweet-server": "^13.5.0",
"tmp-promise": "^1.0.5"
"tmp-promise": "^1.0.5",
"waait": "^1.0.5"
},
"publishConfig": {
"access": "public"
......
......@@ -3,8 +3,13 @@ const {
} = require('pubsweet-server')
const { getPubsub } = require('pubsweet-server/src/graphql/pubsub')
const { db } = require('@pubsweet/db-manager')
const CONVERT_DOCX_TO_HTML = 'CONVERT_DOCX_TO_HTML'
const logger = require('@pubsweet/logger')
const DOCX_TO_HTML = 'DOCX_TO_HTML'
const crypto = require('crypto')
const waait = require('waait')
const resolvers = {
Mutation: {
......@@ -14,10 +19,35 @@ const resolvers = {
const { createReadStream, filename } = await file
const stream = await createReadStream()
const pubsubChannel = `${CONVERT_DOCX_TO_HTML}.${context.user}`
const jobId = crypto.randomBytes(3).toString('hex')
const pubsubChannel = `${DOCX_TO_HTML}.${context.user}.${jobId}`
// A reference to actual pgboss job row
let queueJobId
pubsub.subscribe(pubsubChannel, async ({ docxToHTMLJob: { status } }) => {
logger.info(pubsubChannel, status)
if (status === 'Conversion complete') {
await waait(1000)
const job = await db('pgboss.job').whereRaw(
"data->'request'->>'id' = ?",
[queueJobId],
)
pubsub.publish(pubsubChannel, {
docxToHTMLJob: {
status: 'Result',
html: job[0].data.response.html,
},
})
}
})
pubsub.publish(pubsubChannel, {
docxToHTMLJob: { status: `Uploading file ${filename}` },
docxToHTMLJob: {
status: `Uploading file ${filename}`,
id: jobId,
},
})
const chunks = []
......@@ -26,33 +56,44 @@ const resolvers = {
chunks.push(chunk)
})
return new Promise((resolve, reject) => {
stream.on('end', () => {
pubsub.publish(pubsubChannel, {
docxToHTMLJob: { status: 'File uploaded' },
})
stream.on('end', () => {
pubsub.publish(pubsubChannel, {
docxToHTMLJob: {
status: 'File uploaded and conversion job created',
id: jobId,
},
})
const result = Buffer.concat(chunks)
const result = Buffer.concat(chunks)
jobQueue.publish(`xsweetGraphQL`, {
jobQueue
.publish(`xsweetGraphQL`, {
docx: {
name: filename,
data: result.toString('base64'),
},
pubsubChannel,
})
.then(id => (queueJobId = id))
})
resolve({ status: 'Conversion job submitted' })
stream.on('error', e => {
pubsub.publish(pubsubChannel, {
status: e,
})
stream.on('error', reject)
})
return {
status: 'Uploading file',
id: jobId,
}
},
},
Subscription: {
docxToHTMLJob: {
subscribe: async (_, vars, context) => {
subscribe: async (_, { jobId }, context) => {
const pubsub = await getPubsub()
return pubsub.asyncIterator(`${CONVERT_DOCX_TO_HTML}.${context.user}`)
return pubsub.asyncIterator(`${DOCX_TO_HTML}.${context.user}.${jobId}`)
},
},
},
......@@ -65,10 +106,11 @@ const typeDefs = `
}
extend type Subscription {
docxToHTMLJob: DocxToHTMLJob!
docxToHTMLJob(jobId: String!): DocxToHTMLJob!
}
type DocxToHTMLJob {
id: String
status: String!
html: String
}
......
......@@ -2,7 +2,7 @@ const tmp = require('tmp-promise')
const fs = require('fs')
const path = require('path')
const { execSync, execFileSync } = require('child_process')
const logger = require('@pubsweet/logger')
const { pubsubManager } = require('pubsweet-server')
// encode file to base64
......@@ -30,9 +30,12 @@ const xsweetHandler = enablePubsub => async job => {
try {
let pubsub
if (enablePubsub) {
logger.info(job.data.pubsubChannel, 'has started.')
pubsub = await pubsubManager.getPubsub()
pubsub.publish(job.data.pubsubChannel, {
docxToHTMLJob: { status: 'DOCX to HTML conversion started' },
docxToHTMLJob: {
status: 'DOCX to HTML conversion started',
},
})
}
......@@ -48,7 +51,9 @@ const xsweetHandler = enablePubsub => async job => {
if (enablePubsub) {
pubsub.publish(job.data.pubsubChannel, {
docxToHTMLJob: { status: 'Unzipping DOCX document' },
docxToHTMLJob: {
status: 'Unzipping DOCX document',
},
})
}
......@@ -82,15 +87,14 @@ const xsweetHandler = enablePubsub => async job => {
await cleanup()
if (enablePubsub) {
logger.info(job.data.pubsubChannel, 'has completed.')
pubsub.publish(job.data.pubsubChannel, {
docxToHTMLJob: { status: 'Conversion complete', html: processedHtml },
docxToHTMLJob: { status: 'Conversion complete' },
})
}
return { html: processedHtml }
} catch (e) {
// eslint-disable-next-line
console.log(e, e.stack)
if (enablePubsub) {
const pubsub = await pubsubManager.getPubsub()
pubsub.publish(job.data.pubsubChannel, {
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment