diff --git a/packages/server/src/graphql/pubsub.js b/packages/server/src/graphql/pubsub.js index f1437b4eacf33e3ef09410a5cdc1da2c1b7d8c9f..a79f8bf63922adf72851254770b19196fc8e2504 100644 --- a/packages/server/src/graphql/pubsub.js +++ b/packages/server/src/graphql/pubsub.js @@ -42,12 +42,18 @@ module.exports = { return pubsub }, - destroy: () => { - if (client) { + /** + * @param drain {boolean} whether to wait for the database client to emit 'drain' before closing + * @returns {Promise<void>} + */ + destroy: async ({ drain } = {}) => { + if (pubsub) { + if (drain) { + await new Promise(resolve => pubsub.client.on('drain', resolve)) + } + await pubsub.client.end() pubsub = null - return client.end() } - return Promise.resolve() }, /** diff --git a/packages/server/src/index.js b/packages/server/src/index.js index 15712801d0da393e3f21ab7e79e2b2e467ed5643..e3a799ca58283c469ee9aeb2a235f8246cda62bd 100644 --- a/packages/server/src/index.js +++ b/packages/server/src/index.js @@ -8,6 +8,7 @@ module.exports.Collection = require('./models/Collection') module.exports.helpers = require('./helpers/authorization') module.exports.db = require('./db') +module.exports.pubsubManager = require('./graphql/pubsub') module.exports.NotFoundError = require('./errors/NotFoundError') module.exports.startServer = require('./start-server') diff --git a/packages/server/test/graphql/pubsub_test.js b/packages/server/test/graphql/pubsub_test.js new file mode 100644 index 0000000000000000000000000000000000000000..509555d55fef504b104dbf3f89f372012b47ae47 --- /dev/null +++ b/packages/server/test/graphql/pubsub_test.js @@ -0,0 +1,20 @@ +const { getPubsub, destroy } = require('../../src/graphql/pubsub') + +describe('pubsub manager', () => { + afterEach(() => destroy()) + + it('can call destroy before connect', () => + expect(destroy()).resolves.toBeUndefined()) + + it('waits for client to drain before closing', async () => { + const pubsub = await getPubsub() + const cb = jest.fn() + pubsub.subscribe('test_channel', cb) + pubsub.publish('test_channel', { test: 'content' }) + const destroyPromise = destroy({ drain: true }) + + expect(cb).not.toHaveBeenCalled() + await destroyPromise + expect(cb).toHaveBeenCalledWith({ test: 'content' }) + }) +})