Skip to content
Snippets Groups Projects
Commit 2f570189 authored by Duncan Bennett's avatar Duncan Bennett Committed by Yannis Barlas
Browse files

feat(*): add deferred job for renewing refresh tokens

parent 164789e3
No related branches found
No related tags found
1 merge request!67feat: defer job function
......@@ -24,7 +24,8 @@ module.exports = {
integrations: {
dummy: {
clientId: 'ketida-editor',
redirectUri: 'http://localhost:4000/provider-connection-popup/dummy',
redirectUri:
'http://localhost:4000/provider-connection-popup/lulu?next=/',
tokenUrl:
'https://api.sandbox.lulu.com/auth/realms/glasstree/protocol/openid-connect/token',
},
......
......@@ -43,6 +43,13 @@ module.exports = {
// globalSetup: '<rootDir>/src/models/__tests__/_setup.js',
// globalTeardown: '<rootDir>/src/models/__tests__/_teardown.js',
},
{
displayName: 'app',
testEnvironment: 'node',
testRegex: 'src/__tests__/.+test.js$',
// globalSetup: '<rootDir>/src/models/__tests__/_setup.js',
// globalTeardown: '<rootDir>/src/models/__tests__/_teardown.js',
},
],
maxWorkers: 1,
}
const { boss } = require('pubsweet-server/src/jobs')
const { subscribeJobsToQueue } = require('../jobs')
const { jobs } = require('../services')
const { renewAuthTokens } = require('../utils/tokens')
const freezeTime = 1701856542000
const daySeconds = 24 * 3600
// Mock boss.<publish, subscribe>
jest.mock('pubsweet-server/src/jobs', () => {
const originalModule = jest.requireActual('pubsweet-server/src/jobs')
return {
__esModule: true,
...originalModule,
boss: {
reset() {
this.subscriptions = {}
this.log = []
this.lastJob = undefined
},
async publish(name, data, options) {
this.log.push(`publish ${name}`)
this.lastJob = dummyJob(data, options)
},
async subscribe(name, options, callback) {
this.subscriptions[name] = { options, callback }
},
},
}
})
// Mock Identity.findOne
jest.mock('../models', () => {
const originalModule = jest.requireActual('../models')
return {
__esModule: true,
...originalModule,
Identity: {
// Fake an identity which expires in 7 days time
findOne: async data => ({
oauthRefreshTokenExpiration: new Date(
freezeTime + daySeconds * 7 * 1000,
),
}),
},
}
})
// Mock renewAuthTokens - don't send any api requests etc
jest.mock('../utils/tokens', () => {
const originalModule = jest.requireActual('../utils/tokens')
return {
__esModule: true,
...originalModule,
renewAuthTokens: jest.fn(async (userId, providerLabel) => {}),
}
})
// Mock the date and time
Date.now = jest.fn(() => freezeTime)
const dummyJob = (data, options) => ({
data,
options,
isDone: false,
done() {
this.isDone = true
},
})
describe('jobs service', () => {
beforeEach(async () => {
// Reset the mock boss object
boss.reset()
subscribeJobsToQueue()
})
it('registers jobs', async () => {
expect(Object.keys(boss.subscriptions)).toEqual([
jobs.RENEW_AUTH_TOKENS_JOB,
])
expect(
typeof boss.subscriptions[jobs.RENEW_AUTH_TOKENS_JOB].callback,
).toEqual('function')
})
it('reschedules auth token renewal after successfully renewing the refresh token', async () => {
boss.log = []
// Run the job callback directly and then verify its behaviour
const renewCallback =
boss.subscriptions[jobs.RENEW_AUTH_TOKENS_JOB].callback
const job = dummyJob(
{ userId: 'fakeUserId', providerLabel: 'fakeProviderLabel' },
{},
)
await renewCallback(job)
// renewAuthTokens should have been called
expect(renewAuthTokens.mock.calls.length).toEqual(1)
expect(renewAuthTokens.mock.calls[0]).toEqual([
'fakeUserId',
'fakeProviderLabel',
])
// Job should succeed and be marked done
expect(job.isDone).toBe(true)
// Job should schedule a future job
expect(boss.log).toEqual([`publish ${jobs.RENEW_AUTH_TOKENS_JOB}`])
expect(boss.lastJob.data).toEqual(job.data)
expect(Object.keys(boss.lastJob.options).length).toEqual(1)
// Refresh token expires in 7 days and must be renewed in 6
expect(boss.lastJob.options.startAfter).toEqual(daySeconds * 6)
})
})
......@@ -22,6 +22,7 @@ const index = require('pubsweet-server/src/routes/index')
const healthcheck = require('./healthcheck')
const createCORSConfig = require('./corsConfig')
const { connectToFileStorage } = require('./services/fileStorage')
const { subscribeJobsToQueue } = require('./jobs')
const configureApp = app => {
const models = require('@pubsweet/models')
......@@ -164,6 +165,7 @@ const configureApp = app => {
if (useJobQueue) {
const { startJobQueue } = require('pubsweet-server/src/jobs')
await startJobQueue() // Manage job queue
await subscribeJobsToQueue() // Subscribe job callbacks to the queue
}
if (config.has('pubsweet-server.cron.path')) {
......
const { boss } = require('pubsweet-server/src/jobs')
const logger = require('@pubsweet/logger')
const moment = require('moment')
const { jobs } = require('./services')
const { renewAuthTokens } = require('./utils/tokens')
const { Identity } = require('./models')
/**
* Add a list of jobs to the job queue. If no jobs are specified, subscribe all
* preconfigured jobs to the queue.
*/
const subscribeJobsToQueue = async jobsList => {
logger.info('Subscribing job callbacks to the job queue')
const jobsToSubscribe = jobsList || defaultJobs
const existingSubscriptions = boss.manager?.subscriptions || {}
await Promise.all(
jobsToSubscribe.map(async ({ name, callback, subscribeOptions = {} }) => {
try {
if (!(name instanceof String || typeof name === 'string')) {
throw new Error('Invalid name')
}
if (!(callback instanceof Function)) {
throw new Error('Invalid callback')
}
if (!(subscribeOptions instanceof Object)) {
throw new Error('Invalid subscribeOptions')
}
// Don't resubscribe - it creates unexpected behaviour
if (existingSubscriptions[name] === undefined) {
await boss.subscribe(name, subscribeOptions, callback)
logger.info(`Job ${name}: subscribed`)
} else {
throw new Error('Already subscribed')
}
} catch (e) {
logger.error(`Job ${name}: subscribe error:`, e)
throw e
}
}),
)
}
// TODO - append jobs found in config
// Define default jobs
const defaultJobs = [
{
name: jobs.RENEW_AUTH_TOKENS_JOB,
callback: async job => {
const bufferTime = 24 * 3600
const { userId, providerLabel } = job.data
try {
await renewAuthTokens(userId, providerLabel)
job.done()
} catch (e) {
logger.error(`Job ${jobs.RENEW_AUTH_TOKENS_JOB}: callback error:`, e)
throw e
}
try {
// Schedule auth renewal
const { oauthRefreshTokenExpiration } = await Identity.findOne({
userId,
provider: providerLabel,
})
const expiresIn = (oauthRefreshTokenExpiration - moment().utc()) / 1000
const renewAfter = expiresIn - bufferTime
if (renewAfter < 0) {
throw new Error('"renewAfter" is less than 0')
}
await jobs.defer(
jobs.RENEW_AUTH_TOKENS_JOB,
{ seconds: renewAfter },
{ userId, providerLabel },
)
} catch (e) {
logger.error(`Job ${jobs.RENEW_AUTH_TOKENS_JOB}: defer error:`, e)
throw e
}
},
},
]
module.exports = { subscribeJobsToQueue }
......@@ -2,12 +2,33 @@ const axios = require('axios')
const config = require('config')
const { URLSearchParams: UnpackedParams } = require('url')
const { createUser, createUserAndIdentities } = require('./helpers/users')
const { createUser } = require('./helpers/users')
const { createOAuthIdentity } = require('../identity/identity.controller')
const { User, Identity } = require('../index')
const clearDb = require('./_clearDb')
const { jobs } = require('../../services')
// Mock "renewAuthTokensJob"
jest.mock('../../services', () => {
const { jobs: jobs_, ...originalModule } =
jest.requireActual('../../services')
return {
__esModule: true,
...originalModule,
jobs: {
...jobs_,
defer: jest.fn(async (name, startAfter, data) => [
name,
startAfter,
data,
]),
},
}
})
const fakeAccessToken =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiZW1haWwiOiJkZWZhdWx0QHRlc3QuY29tIiwiZmFtaWx5X25hbWUiOiJXYWx0b24iLCJnaXZlbl9uYW1lIjoiSm9obiJ9.8Qn2H6FAJVUn6T1U7bnbjnuguIFlY5EW_XaII1IJdE4'
......@@ -127,54 +148,11 @@ describe('Identity Controller', () => {
expect(
timeLeft(newProvider.oauthRefreshTokenExpiration) >= 359995000,
).toBeTruthy()
})
it('authorises access and updates the Oauth tokens', async () => {
axios.mockImplementationOnce(fakePostResponse)
const { user, id2: providerIdentity } = await createUserAndIdentities()
const { provider } = providerIdentity
// Make sure provider auth fields are empty
expect(provider.oauthAccessToken).toBe(undefined)
expect(provider.oauthAccessExpiration).toBe(undefined)
expect(provider.oauthRefreshToken).toBe(undefined)
expect(provider.oauthRefreshExpiration).toBe(undefined)
// Validate provider auth tokens
const identity = await Identity.findOne({
userId: user.id,
provider,
})
await Identity.patchAndFetchById(identity.id, {
oauthRefreshTokenExpiration: new Date(1995, 11, 17),
})
// Mock authorisation
await createOAuthIdentity(
user.id,
provider,
'fake-session-state',
'fake-code',
)
const updatedIdentity = await Identity.findById(identity.id)
expect(updatedIdentity.oauthAccessToken).toEqual(fakeAccessToken)
// Expect time left to be 3600s (with 5s uncertainty)
expect(
timeLeft(updatedIdentity.oauthAccessTokenExpiration) <= 3600000,
).toBeTruthy()
expect(
timeLeft(updatedIdentity.oauthAccessTokenExpiration) >= 3595000,
).toBeTruthy()
expect(updatedIdentity.oauthRefreshToken).toEqual('fake.refresh.token')
// Expect time left to be 3600000 (with 5s uncertainty)
expect(
timeLeft(updatedIdentity.oauthRefreshTokenExpiration) <= 360000000,
).toBeTruthy()
expect(
timeLeft(updatedIdentity.oauthRefreshTokenExpiration) >= 359995000,
).toBeTruthy()
// Expect renewal job to have been "scheduled"
const lastCallIndex = jobs.defer.mock.calls.length - 1
const [name, renewAfter, data] = jobs.defer.mock.calls[lastCallIndex]
expect(name).toEqual('renew-auth-tokens')
expect(renewAfter).toEqual({ seconds: 273600 }) // 360000 - 86400
expect(data).toEqual({ providerLabel: 'test', userId: user.id })
})
})
......@@ -3,6 +3,7 @@ const logger = require('@pubsweet/logger')
const axios = require('axios')
const config = require('config')
const { getExpirationTime } = require('../../utils/tokens')
const { jobs } = require('../../services')
const Identity = require('./identity.model')
......@@ -29,7 +30,7 @@ const getDefaultIdentity = async userId => {
}
}
/** activateOauth
/**
* Authorise user OAuth.
* Save OAuth access and refresh tokens.
* Trigger subscription indicating the identity has changed.
......@@ -37,42 +38,44 @@ const getDefaultIdentity = async userId => {
const createOAuthIdentity = async (userId, provider, sessionState, code) => {
// Throw error if unable to acquire and then store authorisation
try {
const authData = await authorizeOAuth(provider, sessionState, code)
const identity = await Identity.findOne({ userId, provider })
if (!identity) {
const {
email,
given_name: givenNames,
family_name: surname,
sub: providerUserId,
} = JSON.parse(
Buffer.from(
authData.oauthAccessToken.split('.')[1],
'base64',
).toString(),
)
return Identity.insert({
email,
provider,
userId,
profileData: {
givenNames,
surname,
providerUserId,
},
...authData,
})
let identity = await Identity.findOne({ userId, provider })
if (identity) {
return identity
}
const refreshTokenExpired =
identity.oauthRefreshTokenExpiration &&
identity.oauthRefreshTokenExpiration.getTime() < new Date().getTime()
const { renewAfter, ...authData } = await authorizeOAuth(
provider,
sessionState,
code,
)
const {
email,
given_name: givenNames,
family_name: surname,
sub: providerUserId,
} = JSON.parse(
Buffer.from(authData.oauthAccessToken.split('.')[1], 'base64').toString(),
)
identity = Identity.insert({
email,
provider,
userId,
profileData: {
givenNames,
surname,
providerUserId,
},
...authData,
})
if (refreshTokenExpired) {
return Identity.patchAndFetchById(identity.id, authData)
}
await jobs.defer(
jobs.RENEW_AUTH_TOKENS_JOB,
{ seconds: renewAfter },
{ userId, providerLabel: provider },
)
return identity
} catch (e) {
......@@ -125,11 +128,18 @@ const authorizeOAuth = async (provider, sessionState, code) => {
throw new Error('Missing data from response!')
}
const renewAfter = refresh_expires_in - 86400
if (renewAfter < 0) {
throw new Error('"renewAfter" is less than 0')
}
return {
oauthAccessToken: access_token,
oauthRefreshToken: refresh_token,
oauthAccessTokenExpiration: getExpirationTime(expires_in),
oauthRefreshTokenExpiration: getExpirationTime(refresh_expires_in),
renewAfter,
}
/* eslint-enable camelcase */
}
......
const { boss } = require('pubsweet-server/src/jobs')
const { defer: deferJob } = require('../jobs')
// Mock boss.<publish, subscribe>
jest.mock('pubsweet-server/src/jobs', () => {
const originalModule = jest.requireActual('pubsweet-server/src/jobs')
return {
__esModule: true,
...originalModule,
boss: {
reset() {
this.subscriptions = {}
this.log = []
this.lastValue = undefined
this.lastJob = undefined
},
async publish(name, data, options) {
this.log.push(`publish ${name}`)
this.lastJob = dummyJob(data, options)
},
async subscribe(name, options, callback) {
this.subscriptions[name] = { options, callback }
},
},
}
})
const dummyJob = (data, options) => ({ data, options })
describe('jobs service', () => {
beforeEach(async () => {
// Reset the mock boss object
boss.reset()
/* eslint-disable-next-line global-require */
const { subscribeJobsToQueue } = require('../../jobs')
subscribeJobsToQueue([
{
name: 'dummy-1',
callback: async job => {
boss.lastValue = `dummy1: ${job.data.value}`
},
subscribeOptions: { fakeOptions: 'subscribe' },
},
{
name: 'dummy-2',
callback: job => {
boss.lastValue = `dummy2: ${job.data.arg1} ${job.data.arg2}`
},
},
])
})
it('registers jobs', async () => {
expect(Object.keys(boss.subscriptions)).toEqual(['dummy-1', 'dummy-2'])
expect(boss.lastValue).toBeUndefined()
expect(boss.lastJob).toBeUndefined()
expect(typeof boss.subscriptions['dummy-1'].callback).toEqual('function')
expect(typeof boss.subscriptions['dummy-2'].callback).toEqual('function')
})
it('defers jobs by seconds', async () => {
boss.log = []
await deferJob(
'dummy-1',
{ seconds: 15 },
{ value: 'some value' },
{ fakeOptions: 'publish' },
)
expect(boss.log).toEqual(['publish dummy-1'])
expect(boss.lastJob.data).toEqual({ value: 'some value' })
expect(Object.keys(boss.lastJob.options).length).toEqual(2)
expect(boss.lastJob.options.startAfter).toEqual(15)
expect(boss.lastJob.options.fakeOptions).toEqual('publish')
boss.log = []
await deferJob('dummy-2', { seconds: 5 }, { arg1: 1, arg2: 2 })
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 1, arg2: 2 })
expect(Object.keys(boss.lastJob.options).length).toEqual(1)
expect(boss.lastJob.options.startAfter).toEqual(5)
})
it('defers jobs by any of days, hours, minutes and/or seconds', async () => {
boss.log = []
await deferJob(
'dummy-2',
{ days: 1, hours: 1, minutes: 1, seconds: 5 },
{ arg1: 2, arg2: 1 },
)
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 2, arg2: 1 })
expect(boss.lastJob.options.startAfter).toEqual(90065)
boss.log = []
await deferJob('dummy-2', { days: 1 }, { arg1: 2, arg2: 1 })
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 2, arg2: 1 })
expect(boss.lastJob.options.startAfter).toEqual(86400)
boss.log = []
await deferJob('dummy-2', { hours: 1 }, { arg1: 2, arg2: 1 })
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 2, arg2: 1 })
expect(boss.lastJob.options.startAfter).toEqual(3600)
boss.log = []
await deferJob('dummy-2', { minutes: 1 }, { arg1: 2, arg2: 1 })
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 2, arg2: 1 })
expect(boss.lastJob.options.startAfter).toEqual(60)
boss.log = []
await deferJob('dummy-2', { seconds: 1 }, { arg1: 2, arg2: 1 })
expect(boss.log).toEqual(['publish dummy-2'])
expect(boss.lastJob.data).toEqual({ arg1: 2, arg2: 1 })
expect(boss.lastJob.options.startAfter).toEqual(1)
})
it('rejects invalid offsets', async () => {
let wrapperFn = async () => deferJob('dummy-1', { years: 1, days: 1 })
await expect(wrapperFn).rejects.toThrow(
new Error('Invalid keys: ["years"]'),
)
wrapperFn = async () => deferJob('dummy-2', { days: 1, minutes: 'three' })
await expect(wrapperFn).rejects.toThrow(
new Error('Invalid values: [1,0,"three",0]'),
)
})
})
const notify = require('./notify')
const { notificationTypes } = require('./constants')
const fileStorage = require('./fileStorage')
const jobs = require('./jobs')
module.exports = {
notify,
notificationTypes,
fileStorage,
jobs,
}
const { deferJob } = require('./jobs.publish')
const jobIdentifiers = require('./jobs.identifiers')
module.exports = {
...jobIdentifiers,
defer: deferJob,
}
module.exports = {
RENEW_AUTH_TOKENS_JOB: 'renew-auth-tokens',
}
const config = require('config')
const isEmpty = require('lodash/isEmpty')
const { boss } = require('pubsweet-server/src/jobs')
const logger = require('@pubsweet/logger')
/**
* Publish a named job and job data to the job queue.
* Throw an error if invalid arguments are provided.
* @param {string} name
* A unique name which identifies the job handler.
* @param { object } startAfter
* Defines the number of seconds after which to run the job.
* @param { Object } data
* Params to pass to the job handler callback.
* @param {Object} [options]
* Optional scheduling parameters.
*/
const deferJob = async (name, startAfter, data, options) => {
try {
// This is equivalent to the "app.js:useJobQueue" check
const jobQueueDisabled =
config.has('pubsweet-server.useJobQueue') &&
config.get('pubsweet-server.useJobQueue') === false
if (jobQueueDisabled) {
throw new Error(`Job queue is disabled`)
}
await boss.publish(name, data || null, {
...options,
startAfter: toSeconds(startAfter),
})
} catch (e) {
logger.error(`Job ${name}: publish error:`, e)
throw e
}
}
/**
* Convert an object representation of a time offset into a seconds offset.
* Throw an error if invalid arguments are provided.
* @param {number} [days=0] - Offset by this number of days.
* @param {number} [hours=0] - Offset by this number of hours.
* @param {number} [minutes=0] - Offset by this number of minutes.
* @param {number} [seconds=0] - Offset by this number of seconds.
* @returns {number} - A number of seconds
*/
const toSeconds = ({
days = 0,
hours = 0,
minutes = 0,
seconds = 0,
...invalid
}) => {
if (!isEmpty(invalid)) {
throw new Error(`Invalid keys: ${JSON.stringify(Object.keys(invalid))}`)
}
const offset = ((days * 24 + hours) * 60 + minutes) * 60 + seconds
if (Number.isNaN(offset)) {
throw new Error(
`Invalid values: ${JSON.stringify([days, hours, minutes, seconds])}`,
)
}
return offset
}
module.exports = { deferJob }
......@@ -116,7 +116,7 @@ const requestTokensFromProvider = async (
}
const renewAuthTokens = async (userId, providerLabel) =>
requestTokensFromProvider(userId, providerLabel)
requestTokensFromProvider(userId, providerLabel, { checkAccessToken: false })
const getAccessToken = async (serviceName, renew = false) => {
try {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment