diff --git a/dev/config/default.js b/dev/config/default.js index 0aa2bd508d26f29f6585784c3e796fce48d85f6f..cc7ecfb2ce778760de7f98663b697dc19a58c7ed 100644 --- a/dev/config/default.js +++ b/dev/config/default.js @@ -45,6 +45,7 @@ module.exports = { }, host: 'localhost', useFileStorage: true, + useGraphQLServer: true, staticFolders: [ { folderPath: './dev/static', diff --git a/src/__tests__/graphql/pubsub.test.js b/src/__tests__/graphql/pubsub.test.js index c3876a5f774f110a4fec8364fdc182553c08dee6..3fa6e7e0aba9d4728195c7212fd71e9395e3c9a8 100644 --- a/src/__tests__/graphql/pubsub.test.js +++ b/src/__tests__/graphql/pubsub.test.js @@ -1,20 +1,19 @@ -const { getPubsub, destroy } = require('../../graphql/pubsub') +const subscriptionManager = require('../../graphql/pubsub') describe('pubsub manager', () => { - afterEach(() => destroy()) + afterEach(() => subscriptionManager.client.end()) it('can call destroy before connect', () => - expect(destroy()).resolves.toBeUndefined()) + expect(subscriptionManager.client.end()).resolves.toBeUndefined()) it('waits for client to drain before closing', async () => { - const pubsub = await getPubsub() const cb = jest.fn() - pubsub.subscribe('test_channel', cb) - pubsub.publish('test_channel', { test: 'content' }) - const destroyPromise = destroy({ drain: true }) + subscriptionManager.subscribe('test_channel', cb) + subscriptionManager.publish('test_channel', { test: 'content' }) expect(cb).not.toHaveBeenCalled() - await destroyPromise + + await subscriptionManager.client.end() expect(cb).toHaveBeenCalledWith({ test: 'content' }) }) }) diff --git a/src/dbManager/connectionConfig.js b/src/dbManager/connectionConfig.js index 71f6c527742c129cb0e6a9a57c4fd717a0a622c1..11ccd8458883ff9506c17a7da671598e2f6dbc78 100644 --- a/src/dbManager/connectionConfig.js +++ b/src/dbManager/connectionConfig.js @@ -2,14 +2,18 @@ const config = require('config') const { isEnvVariableTrue } = require('../utils/env') -const connectionConfig = config.get('db') +const getDbConnectionConfig = () => { + const connectionConfig = config.get('db') -// clone to allow mutation for the case of adding ssl -const connection = { ...connectionConfig } + // clone to allow mutation for the case of adding ssl + const connection = { ...connectionConfig } -if (isEnvVariableTrue(process.env.POSTGRES_ALLOW_SELF_SIGNED_CERTIFICATES)) { - if (!connection.ssl) connection.ssl = {} - connection.ssl.rejectUnauthorized = false + if (isEnvVariableTrue(process.env.POSTGRES_ALLOW_SELF_SIGNED_CERTIFICATES)) { + if (!connection.ssl) connection.ssl = {} + connection.ssl.rejectUnauthorized = false + } + + return connection } -module.exports = connection +module.exports = getDbConnectionConfig diff --git a/src/dbManager/db.js b/src/dbManager/db.js index cc3c3839b6743c562f28e9791803fcccb5bb1016..085be3f6dc8978dc3b662cb425ec79f357d63fb8 100644 --- a/src/dbManager/db.js +++ b/src/dbManager/db.js @@ -1,7 +1,9 @@ const knex = require('knex') const config = require('config') const { knexSnakeCaseMappers } = require('objection') -const connection = require('./connectionConfig') +const getDbConnectionConfig = require('./connectionConfig') + +const connectionConfig = getDbConnectionConfig() const pool = config.has('pool') && config.get('pool') @@ -12,7 +14,7 @@ const acquireConnectionTimeout = const db = knex({ client: 'pg', - connection, + connection: connectionConfig, pool, ...knexSnakeCaseMappers(), acquireConnectionTimeout, diff --git a/src/graphql/PostgresPubSub.js b/src/graphql/PostgresPubSub.js index 72786e40d522bae3437c4a964be55316612672ad..9005fc93c00ffd4b1e7e4e2eb81869ed13f38f04 100644 --- a/src/graphql/PostgresPubSub.js +++ b/src/graphql/PostgresPubSub.js @@ -68,4 +68,4 @@ class PostgresPubSub extends PubSub { } } -module.exports = { PostgresPubSub } +module.exports = PostgresPubSub diff --git a/src/graphql/PostgresPubSubNoop.js b/src/graphql/PostgresPubSubNoop.js new file mode 100644 index 0000000000000000000000000000000000000000..6c538354fc2b5c7ceeaf2874af1df11288ed6943 --- /dev/null +++ b/src/graphql/PostgresPubSubNoop.js @@ -0,0 +1,25 @@ +class PostgresPubSubNoop { + static error() { + throw new Error( + 'Cannot use the Subscription Manager when useGraphQLServer is false in the config', + ) + } + + publish() { + this.error() + } + + subscribe(triggerName, onMessage) { + this.error() + } + + unsubscribe(subId) { + this.error() + } + + asyncIterator(triggers) { + this.error() + } +} + +module.exports = PostgresPubSubNoop diff --git a/src/graphql/pubsub.js b/src/graphql/pubsub.js index 3595b69a04831711118acc9ef4d2117293e843ac..99e33565b1d5bb0c6be0408cbe99628d87c7307b 100644 --- a/src/graphql/pubsub.js +++ b/src/graphql/pubsub.js @@ -1,68 +1,14 @@ const config = require('config') -const pg = require('pg') -const { PostgresPubSub } = require('./PostgresPubSub') -const connectionConfig = require('../dbManager/connectionConfig') +const PostgresPubSub = require('./PostgresPubSub') +const PostgresPubSubNoop = require('./PostgresPubSubNoop') +const getDbConnectionConfig = require('../dbManager/connectionConfig') -const ignoreTerminatedError = - config.has('ignoreTerminatedConnectionError') && - config.get('ignoreTerminatedConnectionError') +const connectionConfig = getDbConnectionConfig() -let pubsub -let client +const exportedClass = + config.has('useGraphQLServer') && config.get('useGraphQLServer') + ? new PostgresPubSub(connectionConfig) + : new PostgresPubSubNoop() -module.exports = { - /** - * Pubsub object used in graphql subscriptions - * to push messages back to the client. - */ - getPubsub: async () => { - if (pubsub) return pubsub - client = new pg.Client(connectionConfig) - - // ignore some errors which are thrown in integration tests - if (ignoreTerminatedError) { - client.on('error', async err => { - if ( - err.message !== - 'terminating connection due to administrator command' && - err.message !== 'Connection terminated unexpectedly' - ) { - throw err - } - }) - } - - await client.connect() - pubsub = new PostgresPubSub({ client }) - - if (ignoreTerminatedError) { - pubsub.subscribe('error', () => {}) - } - - return pubsub - }, - - /** - * @param drain {boolean} whether to wait for the database client to emit 'drain' before closing - * @returns {Promise<void>} - */ - destroy: async ({ drain } = {}) => { - if (pubsub) { - if (drain) { - /* eslint-disable-next-line no-promise-executor-return */ - await new Promise(resolve => pubsub.client.on('drain', resolve)) - } - - await pubsub.client.end() - pubsub = null - } - }, - - /** - * Iterators to listen to - */ - asyncIterators: { - ON_UPLOAD_PROGRESS: 'ON_UPLOAD_PROGRESS', - }, -} +module.exports = exportedClass diff --git a/src/index.js b/src/index.js index 3b155f207b33cf001fa3fa747eb85c858d1261f0..28aa29f0734aa9ef9ea3eb0b573e94a7daa8cb60 100644 --- a/src/index.js +++ b/src/index.js @@ -6,7 +6,7 @@ const { send: sendEmail } = require('./services/sendEmail') const logger = require('./logger') const db = require('./dbManager/db') const { migrate } = require('./dbManager/migrate') -const pubsubManager = require('./graphql/pubsub') +const subscriptionManager = require('./graphql/pubsub') const authentication = require('./authentication') const { File } = require('./models') const { createFile, deleteFiles } = require('./models/file/file.controller') @@ -39,7 +39,7 @@ const verifyJWT = authentication.token.verify module.exports = { createJWT, verifyJWT, - pubsubManager, + subscriptionManager, modelJsonSchemaTypes, fileStorage, createFile, diff --git a/src/jobs.js b/src/jobs.js index 39d212736a34bf65cbe0bb0b7d3d14937ea2008a..48fce49c1a82b1aec6a2f67fe0a4d34e2745d741 100644 --- a/src/jobs.js +++ b/src/jobs.js @@ -4,7 +4,7 @@ const moment = require('moment') const logger = require('./logger') const { logTask, logTaskItem } = require('./logger/internals') const db = require('./dbManager/db') -const pubsubManager = require('./graphql/pubsub') +const subscriptionManager = require('./graphql/pubsub') const { REFRESH_TOKEN_EXPIRED } = require('./services/jobs/jobs.identifiers') const { @@ -157,7 +157,6 @@ const defaultJobs = [ name: REFRESH_TOKEN_EXPIRED, callback: async job => { try { - const pubsub = await pubsubManager.getPubsub() const { userId, providerLabel } = job.data const updatedUser = await getUser(userId) @@ -180,7 +179,7 @@ const defaultJobs = [ oauthRefreshTokenExpiration.getTime() < UTCNowTimestamp if (refreshTokenExpired) { - pubsub.publish(USER_UPDATED, { + subscriptionManager.publish(USER_UPDATED, { userUpdated: updatedUser, }) } diff --git a/src/logger/internals.js b/src/logger/internals.js index cefbb1528d3513918ed8de6ad87462b530b0675b..50365a4d454a1dd3c0c2e32c13dc0edd6bbf40ac 100644 --- a/src/logger/internals.js +++ b/src/logger/internals.js @@ -3,8 +3,8 @@ const chalk = require('chalk') const logger = require('./index') const BULLET = '\u25cf' -// const CHECK = '\u2713' -const CHECK_BG = '\u2705' +const CHECK = '\u2713' +// const CHECK_BG = '\u2705' const CROSS = '\u2718' const HORIZONTAL_BOX = '\u2500' const PICKAXE = '\u26CF' @@ -37,7 +37,7 @@ const logSuccess = str => { const logSuccessTask = str => { logger.info( - `${chalk.cyan(BULLET)} ${chalk.green(str)} ${chalk.green(CHECK_BG)}`, + `${chalk.cyan(BULLET)} ${chalk.green(str)} ${chalk.green(CHECK)}`, ) } diff --git a/src/models/identity/identity.controller.js b/src/models/identity/identity.controller.js index 3d3813cc32a9675902c542d7ab085a51d35bcc48..e85ddb0d149f04baa51fc1fc477758e5b6d21566 100644 --- a/src/models/identity/identity.controller.js +++ b/src/models/identity/identity.controller.js @@ -3,7 +3,7 @@ const config = require('config') const moment = require('moment') const logger = require('../../logger') -const pubsubManager = require('../../graphql/pubsub') +const subscriptionManager = require('../../graphql/pubsub') const { getExpirationTime, foreverDate } = require('../../utils/time') const { jobs } = require('../../services') const { getUser } = require('../user/user.controller') @@ -194,8 +194,6 @@ const invalidateProviderAccessToken = async (userId, providerLabel) => { } const invalidateProviderTokens = async (userId, providerLabel) => { - const pubsub = await pubsubManager.getPubsub() - const updatedUser = await getUser(userId) const providerUserIdentity = await Identity.findOne({ @@ -208,7 +206,7 @@ const invalidateProviderTokens = async (userId, providerLabel) => { oauthRefreshTokenExpiration: moment().utc().toDate(), }) - pubsub.publish(USER_UPDATED, { + subscriptionManager.publish(USER_UPDATED, { userUpdated: updatedUser, }) diff --git a/src/models/identity/identity.resolvers.js b/src/models/identity/identity.resolvers.js index 23c16e2faa32a96ed3d85159a59dd61bdca8d4a4..d4e1b037afb8a63574eca2f40947871d66b18e3a 100644 --- a/src/models/identity/identity.resolvers.js +++ b/src/models/identity/identity.resolvers.js @@ -1,5 +1,5 @@ const logger = require('../../logger') -const pubsubManager = require('../../graphql/pubsub') +const subscriptionManager = require('../../graphql/pubsub') const { createOAuthIdentity, @@ -32,10 +32,9 @@ const createOAuthIdentityResolver = async ( code, ) - const pubsub = await pubsubManager.getPubsub() const user = await getUser(userId) - pubsub.publish(USER_UPDATED, { + subscriptionManager.publish(USER_UPDATED, { userUpdated: user, }) diff --git a/src/models/team/team.resolvers.js b/src/models/team/team.resolvers.js index 88ed47499cc76aab9fa918173823f2c72169791e..8055f551da3b3a4fbcdd4e94b879b6be8dfac213 100644 --- a/src/models/team/team.resolvers.js +++ b/src/models/team/team.resolvers.js @@ -1,5 +1,5 @@ const logger = require('../../logger') -const pubsubManager = require('../../graphql/pubsub') +const subscriptionManager = require('../../graphql/pubsub') const { labels: { TEAM_RESOLVER }, @@ -26,11 +26,9 @@ const TeamMember = require('../teamMember/teamMember.model') const broadcastUserUpdated = async userId => { try { - const pubsub = await pubsubManager.getPubsub() - const updatedUser = await getUser(userId) - return pubsub.publish(USER_UPDATED, { + return subscriptionManager.publish(USER_UPDATED, { userUpdated: updatedUser, }) } catch (e) { diff --git a/src/models/user/user.resolvers.js b/src/models/user/user.resolvers.js index cf79b32051de588218bb89dc80af1a84fc3b4ff4..4114cdc5b9d805b3192e413d67a12464894ccbb6 100644 --- a/src/models/user/user.resolvers.js +++ b/src/models/user/user.resolvers.js @@ -1,7 +1,7 @@ const { withFilter } = require('graphql-subscriptions') const logger = require('../../logger') -const pubsubManager = require('../../graphql/pubsub') +const subscriptionManager = require('../../graphql/pubsub') const { labels: { USER_RESOLVER }, @@ -303,11 +303,9 @@ module.exports = { Subscription: { userUpdated: { subscribe: async (...args) => { - const pubsub = await pubsubManager.getPubsub() - return withFilter( () => { - return pubsub.asyncIterator(USER_UPDATED) + return subscriptionManager.asyncIterator(USER_UPDATED) }, (payload, variables) => { const { userId } = variables diff --git a/src/startServer.js b/src/startServer.js index 22669c99903a894f3b3ba43274dbe14864fa6456..af243e3c631d068b9ecbb671f17c12a535151697 100644 --- a/src/startServer.js +++ b/src/startServer.js @@ -14,6 +14,7 @@ const { startJobQueue, subscribeJobsToQueue, stopJobQueue } = require('./jobs') const authentication = require('./authentication') const healthcheck = require('./healthcheck') const setupGraphqlServer = require('./graphql/setup') +const subscriptionManager = require('./graphql/pubsub') const seedGlobalTeams = require('./startup/seedGlobalTeams') const ensureTempFolderExists = require('./startup/ensureTempFolderExists') @@ -153,6 +154,10 @@ const shutdown = async signal => { logTaskItem('Successfully shut down job queue') } + if (useGraphQLServer) { + await subscriptionManager.client.end() + } + const endTime = performance.now() const durationInSeconds = (endTime - startTime) / 1000 // Convert to seconds logInit( diff --git a/src/utils/tokens.js b/src/utils/tokens.js index 2d04cf4250f7c64c53dc999ae3fd09d37362bcf3..74d75ee1c0f13472466f6397639ab69da1c799c0 100644 --- a/src/utils/tokens.js +++ b/src/utils/tokens.js @@ -3,7 +3,7 @@ const config = require('config') const axios = require('axios') const moment = require('moment') -const pubsubManager = require('../graphql/pubsub') +const subscriptionManager = require('../graphql/pubsub') const { getExpirationTime, foreverDate } = require('./time') @@ -34,7 +34,6 @@ const requestTokensFromProvider = async ( providerLabel, options = {}, ) => { - const pubsub = await pubsubManager.getPubsub() const { checkAccessToken, returnAccessToken } = options const providerUserIdentity = await Identity.findOne({ @@ -70,7 +69,7 @@ const requestTokensFromProvider = async ( if (refreshTokenExpired) { const updatedUser = await getUser(userId) - pubsub.publish(USER_UPDATED, { + subscriptionManager.publish(USER_UPDATED, { userUpdated: updatedUser, }) // logger.error(