From f3c8b934017f05e33b2a2283c3b2c514dbe640bc Mon Sep 17 00:00:00 2001 From: Jure Triglav <juretriglav@gmail.com> Date: Thu, 25 Oct 2018 17:18:46 +0200 Subject: [PATCH] feat: support for a job runner in a different process --- packages/server/src/jobs/index.js | 25 +++++++--- packages/server/test/jobs/job_processor.js | 17 +++++++ .../test/jobs/jobs_different_process_test.js | 47 +++++++++++++++++++ packages/server/test/jobs/jobs_test.js | 2 +- 4 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 packages/server/test/jobs/job_processor.js create mode 100644 packages/server/test/jobs/jobs_different_process_test.js diff --git a/packages/server/src/jobs/index.js b/packages/server/src/jobs/index.js index 985730f77..61ebba6fd 100644 --- a/packages/server/src/jobs/index.js +++ b/packages/server/src/jobs/index.js @@ -28,13 +28,26 @@ const boss = new PgBoss({ db: dbAdapter }) boss.on('error', error => logger.error(error)) let started = false +let connected = false -module.exports = async () => { - if (started) { +module.exports = { + start: async () => { + if (started) { + return boss + } + + await boss.start() + started = true + connected = true return boss - } + }, + connect: async () => { + if (connected) { + return boss + } - await boss.start() - started = true - return boss + await boss.connect() + connected = true + return boss + }, } diff --git a/packages/server/test/jobs/job_processor.js b/packages/server/test/jobs/job_processor.js new file mode 100644 index 000000000..96bb30076 --- /dev/null +++ b/packages/server/test/jobs/job_processor.js @@ -0,0 +1,17 @@ +const someHandler = async job => Promise.resolve({ thing: 'someOtherThing' }) + +const handleJobs = async () => { + global.__testDbName = process.env.__TESTDBNAME + const { connect: jobs } = require('../../src/jobs') + + const jobQueue = await jobs() + + const queueName = 'aJobQueue2' + + // Subscribe to the job queue with an async handler + await jobQueue.subscribe(queueName, someHandler) +} + +// setInterval(() => {}, 1 << 30) + +handleJobs() diff --git a/packages/server/test/jobs/jobs_different_process_test.js b/packages/server/test/jobs/jobs_different_process_test.js new file mode 100644 index 000000000..fc38fa5f6 --- /dev/null +++ b/packages/server/test/jobs/jobs_different_process_test.js @@ -0,0 +1,47 @@ +const path = require('path') +const { start: jobs } = require('../../src/jobs') +const { spawn } = require('child_process') + +describe('job runner in a different process', () => { + let jobQueue + + beforeAll(async () => { + jobQueue = await jobs() + }) + + it('submits a job and gets notified on completion', async done => { + const queueName = 'aJobQueue2' + + // Add 2 jobs to the queue + await jobQueue.publish(queueName, { param: 'aThing' }) + await jobQueue.publish(queueName, { param: 'anotherThing' }) + + // console.log('global', global) + const jobProcessorPath = path.resolve(__dirname, 'job_processor.js') + + let jobCount = 0 + + // Be notified on job completion with job result + await jobQueue.onComplete(queueName, job => { + try { + expect(job.data.response).toEqual({ thing: 'someOtherThing' }) + jobCount += 1 + if (jobCount === 2) { + jobProcessor.kill() + done() + } + } catch (e) { + done.fail(e) + } + }) + + const jobProcessor = spawn('node', [jobProcessorPath], { + stdio: 'inherit', + env: Object.assign({}, process.env, { + __TESTDBNAME: global.__testDbName, + }), + }) + }) + + afterAll(async () => jobQueue.stop()) +}) diff --git a/packages/server/test/jobs/jobs_test.js b/packages/server/test/jobs/jobs_test.js index 0b4eb2599..fb28d51d7 100644 --- a/packages/server/test/jobs/jobs_test.js +++ b/packages/server/test/jobs/jobs_test.js @@ -1,4 +1,4 @@ -const jobs = require('../../src/jobs') +const { start: jobs } = require('../../src/jobs') const someHandler = async job => { expect(job.data.param).toEqual('aThing') -- GitLab