diff --git a/src/event-bus/index.ts b/src/event-bus/index.ts index 7a12df0cddf9e102ebe800206e35e15ea3b545ba..96fbe600e55729c36f4bf28d0436b6dc420881c7 100644 --- a/src/event-bus/index.ts +++ b/src/event-bus/index.ts @@ -1,15 +1,19 @@ // Abstract Message Queue - types and interfaces +/** + * TODO: Once agreed, add these to DefinitelyTyped so they can be shared. + */ + +export type EventType = string; + export interface Event<T extends object> { - eventType: string; - // Should this be readonly? How would we set this? Constructor, builder? Currently the publisher sets this externally, we should probably set this internally as a uuid - id: string; // Generated when the event is emitted - created: Date; - payload: T; // The actual data - //Should version & context be removed from our own implimentation as we're not using them? - version?: number; // Version of the payload - context?: unknown; // context about the event itself, including the actor - // that triggered the transmission of the event; + readonly eventType: EventType; + readonly id: string; // Generated when the event is emitted + readonly created: Date; + readonly payload: T; // The actual data the event is carrying. + // version: has been removed - so we can remain weakly typed + // context: has also been removed - if you need information about the origin + // source of the event then put it in the payload. } export interface EventPublisher { @@ -22,13 +26,17 @@ export interface EventSubscriber { subscribe<T extends object>(eventType: string, handler: (event: Event<T>) => Promise<boolean>): void; } -// Interface needed to be fulfilled in order to be used as an EventBus -export interface EventBus extends EventPublisher, EventSubscriber { - // This needs to be documented better / commented better. What are eventDefinitions? how is a serviceName going to be used? - init(eventDefinitions: string[], serviceName: string): Promise<this>; - - destroy(): Promise<void>; +export abstract class EventBus { + // register the following: + // - eventsToHandle - a list of events you will publish/subscribe to + // - serviceName - used when subscribing to generate a unique queue for holding + // incoming messages of the form: `consumer__${eventType}__${serviceName}` + constructor(readonly eventsToHandle: EventType[], readonly serviceName: string) {} + destroy(): Promise<void> { + return Promise.resolve(); + } } + // This isn't generic enough export interface EventConfig { url: string; diff --git a/src/mock-event-bus/index.test.ts b/src/mock-event-bus/index.test.ts index f74cc1dab43aba6048264af8f9fcb43df2a2c6f0..5e879ecec2fa6ce23475081a0aeb483611a09ebb 100644 --- a/src/mock-event-bus/index.test.ts +++ b/src/mock-event-bus/index.test.ts @@ -20,7 +20,7 @@ describe('mock message queue', () => { const eventType = 'libero:mock:test'; const mockHandler = jest.fn(async () => true); - const mockEventBus = await new MockEventBus().init([eventType], 'message-bus-test'); + const mockEventBus = await new MockEventBus([eventType], 'message-bus-test'); mockEventBus.subscribe<MockEvent1>(eventType, mockHandler); @@ -73,7 +73,7 @@ describe('mock message queue', () => { return Promise.resolve(true); }; - const mockEventBus = await new MockEventBus().init([eventType1, eventType2], 'message-bus-test'); + const mockEventBus = new MockEventBus([eventType1, eventType2], 'message-bus-test'); mockEventBus.subscribe<TestEventPayload1>(eventType1, mockHandler1); mockEventBus.subscribe<TestEventPayload2>(eventType2, mockHandler2); diff --git a/src/mock-event-bus/index.ts b/src/mock-event-bus/index.ts index 2cdaae4426f6a783e8451c6c4a9b687fccd7b6fa..968beecb62f06161cd79e4bc115bb354134e1a91 100644 --- a/src/mock-event-bus/index.ts +++ b/src/mock-event-bus/index.ts @@ -1,67 +1,32 @@ -import { EventBus, Event } from '../event-bus'; -import { Option, None, Some } from 'funfix'; - +import { EventBus, Event, EventPublisher, EventSubscriber } from '../event-bus'; export type AnyEvent = Event<object>; export type AnyHandler = (ev: AnyEvent) => Promise<boolean>; -// TODO: Re-look at all of this file. Not sure where this came from, looks commented differently. -// Can this be moved into testing - /** - * Mocks out the EventBus for use in tests. - * - * @export - * @class MockEventBus - * @implements {EventBus} + * MockEventBus is defined here for use during testing across services that use + * the event-bus. Examples are: audit and continuum-auth. */ -// Should be InProcessEventBus -export class MockEventBus implements EventBus { - private queues: Option<Map<string, AnyHandler>> = None; - - // This lint suppression wouldn't be needed if these params were passed into constructor - // eslint-disable-next-line @typescript-eslint/no-unused-vars - public async init(_defs: string[], _serviceName: string): Promise<this> { - this.queues = Some(new Map()); - return this; - } +export class MockEventBus extends EventBus implements EventPublisher, EventSubscriber { + private queues: Map<string, AnyHandler> = new Map(); - /** - * Allows the MockEventBus to publish any event of type Event<T> - * - * @template T - The payload for the event - * @param {T} event - Of type Event<T>, where T is the payload - * @returns {Promise<boolean>} - * @memberof MockEventBus - */ public async publish<T extends object>(event: Event<T>): Promise<boolean> { - return ( - this.queues - // Why are we flatMapping this.queues? - .flatMap(queues => Option.of(queues.get(`${event.eventType}`))) - .map(fn => { - return fn(event); - }) - .getOrElse(false) - ); + const fn = this.queues.get(`${event.eventType}`); + if (fn) { + if (this.eventsToHandle.includes(event.eventType)) { + return fn(event); + } + } + return Promise.resolve(false); } - /** - * Allows the MockEventBus to subscribe any event of type Event<T> - * - * @template T - The payload for the event - * @param {string} eventType - * @param {(event: T) => Promise<boolean>} handler - Function of type (event: Event<T>) => Promise<boolean> where T is the payload - * @memberof MockEventBus - */ public async subscribe<T extends object>( eventType: string, handler: (event: Event<T>) => Promise<boolean>, ): Promise<void> { - this.queues.get().set(`${eventType}`, handler); - } - - public async destroy(): Promise<void> { - return Promise.resolve(); + if (!this.serviceName) { + Promise.reject(`Service name not set!`); + } + this.queues.set(`${eventType}`, handler); } } diff --git a/src/rabbit-event-bus/amqp-connector.test.ts b/src/rabbit-event-bus/amqp-connector.test.ts index 1a250b456ba9a5be58e25f005fee25d4dc797fa1..ecff95b78873e328b1453785ed2558820bb05b1c 100644 --- a/src/rabbit-event-bus/amqp-connector.test.ts +++ b/src/rabbit-event-bus/amqp-connector.test.ts @@ -5,6 +5,7 @@ import { StateChange } from './types'; import { channel } from 'rs-channel-node'; import { Event } from 'event-bus'; import { InfraLogger as logger } from '../logger'; +import { EventUtils } from './event-utils'; jest.mock('amqplib'); jest.mock('../logger'); @@ -80,7 +81,7 @@ describe('AMQP connector', () => { const eventType = 'test:event'; (connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection); - new AMQPConnector(url, channel(), [], [{ eventType, handler: jest.fn() }], 'service'); + const conn = new AMQPConnector(url, channel(), [], [{ eventType, handler: jest.fn() }], 'service'); await flushPromises(); expect(mockChannel.assertQueue).toHaveBeenCalledTimes(1); @@ -93,6 +94,8 @@ describe('AMQP connector', () => { ); expect(mockChannel.consume).toHaveBeenCalledTimes(1); expect(mockChannel.consume.mock.calls[0][0]).toBe('consumer__test:event__service'); + expect(conn.subscribedEvents).toHaveLength(1); + expect(conn.subscribedEvents[0]).toBe('test:event'); }); }); @@ -200,12 +203,23 @@ describe('AMQP connector', () => { expect(mockChannel.ack.mock.calls[0][0].content.toString()).toBe('{ "event": "foo" }'); }); - it('it should call the subscription handler and unacknowledge if not ok', async () => { + it('it should call the subscription handler and unacknowledged if not ok', async () => { + const testEvent = { + id: '12345', + eventType: 'test:event', + payload: { event: 'foo' }, + created: new Date('2019-12-30T12:30:00'), + }; + + const messageString = JSON.stringify(EventUtils.eventToMessage<object>(testEvent)); + const mockChannel = makeChannel({ assertQueue: jest.fn().mockImplementation(() => Promise.resolve()), consume: (___, callback) => { callback({ - content: { toString: (): string => '{ "event": "foo" }' }, + content: { + toString: (): string => messageString, + }, }); }, }); @@ -221,9 +235,20 @@ describe('AMQP connector', () => { await flushPromises(); expect(handler).toHaveBeenCalledTimes(1); expect(logger.warn).toHaveBeenCalledTimes(1); - expect(logger.warn).toHaveBeenCalledWith('eventHandlerFailure'); + expect(logger.warn).toHaveBeenCalledWith( + 'eventHandlerFailure', + { attempts: 0, failures: 0, retries: 10 }, + { + created: '2019-12-30T12:30:00.000Z', + eventType: 'test:event', + id: '12345', + payload: { event: 'foo' }, + }, + ); expect(mockChannel.nack).toHaveBeenCalledTimes(1); - expect(mockChannel.nack.mock.calls[0][0].content.toString()).toBe('{ "event": "foo" }'); + const unacknowledgedMessage = mockChannel.nack.mock.calls[0][0].content.toString(); + expect(unacknowledgedMessage).toBe(messageString); + expect(mockChannel.nack.mock.calls[0][1]).toBe(false); expect(mockChannel.nack.mock.calls[0][2]).toBe(true); }); @@ -250,7 +275,11 @@ describe('AMQP connector', () => { await flushPromises(); expect(handler).toHaveBeenCalledTimes(0); expect(logger.warn).toHaveBeenCalledTimes(1); - expect(logger.warn).toHaveBeenCalledWith("Can't parse JSON"); + expect(logger.warn).toHaveBeenCalledWith( + 'Unable to parse message content to JSON', + 'not json', + 'SyntaxError: Unexpected token o in JSON at position 1', + ); expect(mockChannel.nack).toHaveBeenCalledTimes(1); expect(mockChannel.nack.mock.calls[0][0].content.toString()).toBe('not json'); expect(mockChannel.nack.mock.calls[0][1]).toBe(false); diff --git a/src/rabbit-event-bus/amqp-connector.ts b/src/rabbit-event-bus/amqp-connector.ts index 9fd90748f1b5dd874e062b228d389a4430a4cdff..84f648d3377e4e7859aecd2581e281e5431144ac 100644 --- a/src/rabbit-event-bus/amqp-connector.ts +++ b/src/rabbit-event-bus/amqp-connector.ts @@ -1,9 +1,9 @@ import { Sender, Channel } from 'rs-channel-node'; -import { Option } from 'funfix'; +import { Option, None } from 'funfix'; import { Connection, Message } from 'amqplib'; import * as amqplib from 'amqplib'; import { InfraLogger as logger } from '../logger'; -import { Event } from '../event-bus'; +import { Event, EventType } from '../event-bus'; import { Subscription, StateChange, Message as EventBusMessage } from './types'; import { EventUtils } from './event-utils'; @@ -12,9 +12,9 @@ export default class AMQPConnector { send: Sender<StateChange>; }; private serviceName = 'unknown-service'; - private subscriptions: Array<Subscription<object>>; private connection: Connection; private destroyed = false; + private subscriptions: EventType[] = []; public constructor( url: string, @@ -24,31 +24,13 @@ export default class AMQPConnector { serviceName: string, ) { this.externalConnector = { send: sender }; - this.subscriptions = subscriptions; this.serviceName = serviceName; // Set up the connections to the AMQP server this.connect(url) .then(async connection => { this.connection = connection; - // Setup the exchanges - - const rabbitChannel = await this.connection.createChannel(); - await Promise.all( - eventDefs.map(async (eventType: string) => - rabbitChannel.assertExchange(EventUtils.eventTypeToExchange(eventType), 'fanout'), - ), - ) - .catch(() => logger.fatal("can't create exchanges")) - .then(() => { - this.connected(); - }); - - // Create subscribers here - this.subscriptions.forEach(async subscription => { - // subscribe - await this.subscribe(subscription.eventType, subscription.handler); - }); + this.setupExchanges(eventDefs, subscriptions); }) .catch(() => { // notify the manager object that the connection has failed @@ -57,19 +39,6 @@ export default class AMQPConnector { }); } - private async connect(rabbitUrl: string): Promise<Connection> { - try { - const connection = await amqplib.connect(rabbitUrl); - connection.on('error', () => this.disconnected()); - connection.on('end', () => this.disconnected()); - connection.on('close', () => this.disconnected()); - - return connection; - } catch (e) { - throw new Error('Connection failed'); - } - } - public async destroy(): Promise<void> { this.destroyed = true; @@ -79,65 +48,45 @@ export default class AMQPConnector { } public async subscribe<P extends object>( - eventType: string, + eventType: EventType, handler: (ev: Event<P>) => Promise<boolean>, ): Promise<void> { // For the event identifier: // - Declare a subscriber queue // - bind that queue to event exchange // Runs the handler function on any event that matches that type - return Option.of(this.connection) - .map(async (conn: Connection) => { - const rabbitChannel = await conn.createChannel(); - rabbitChannel.on('error', () => this.disconnected()); - - return await rabbitChannel - .assertQueue(EventUtils.eventTypeToQueue(eventType, this.serviceName)) - .then(async () => { - await rabbitChannel.bindQueue( - EventUtils.eventTypeToQueue(eventType, this.serviceName), - EventUtils.eventTypeToExchange(eventType), - '', - ); - logger.trace('subscribe'); - - await rabbitChannel.consume( - EventUtils.eventTypeToQueue(eventType, this.serviceName), - async (msg: Message) => { - try { - const message: EventBusMessage<Event<P>> = JSON.parse(msg.content.toString()); - - handler(message.event).then(isOk => { - if (isOk) { - // Ack - rabbitChannel.ack(msg); - } else { - // Nack - logger.warn('eventHandlerFailure'); - rabbitChannel.nack(msg, false, true); - } - }); - } catch (e) { - rabbitChannel.nack(msg, false, false); - logger.warn("Can't parse JSON"); - } - }, - ); - }) - .catch(() => { - logger.fatal("can't create subscriber queues"); + const channelOption = await this.createChannel(); + + if (!channelOption.isEmpty()) { + const rabbitChannel = channelOption.get(); + rabbitChannel.on('error', () => this.disconnected()); + return rabbitChannel + .assertQueue(EventUtils.makeConsumerQueueName(eventType, this.serviceName)) + .then(async () => { + const qName = EventUtils.makeConsumerQueueName(eventType, this.serviceName); + const exName = EventUtils.makeEventExchangeName(eventType); + + await rabbitChannel.bindQueue(qName, exName, ''); + logger.trace('subscribe'); + this.subscriptions.push(eventType); + + await rabbitChannel.consume(qName, async (msg: Message) => { + return this.decoratedHandler<P>(rabbitChannel, handler, msg); }); - }) - .getOrElseL(() => { - // Do we want to handle reconnects &/or retries here? - setTimeout(() => this.subscribe(eventType, handler), 1000); - logger.warn("No connection, can't subscribe, trying again soon!"); - }); + }) + .catch(() => { + logger.fatal(`Can't create subscriber queues for: ${this.serviceName} using event: ${eventType}`); + }); + } else { + // Do we want to handle reconnects &/or retries here? + setTimeout(() => this.subscribe(eventType, handler), 1000); + logger.warn("No connection, can't subscribe, trying again soon!"); + } } public async publish<P extends object>(event: Event<P>): Promise<boolean> { // publish the message - const whereTo = EventUtils.eventTypeToExchange(event.eventType); + const whereTo = EventUtils.makeEventExchangeName(event.eventType); return Option.of(this.connection) .map(async connection => { try { @@ -161,6 +110,81 @@ export default class AMQPConnector { .getOrElse(false); } + public get subscribedEvents(): EventType[] { + return this.subscriptions; + } + + private async createChannel(): Promise<Option<amqplib.Channel>> { + const conn = Option.of(this.connection); + if (conn.isEmpty()) { + return None; + } else { + return Option.of(await conn.get().createChannel()); + } + } + + private decoratedHandler<P extends object>( + rabbitChannel: amqplib.Channel, + handler: (ev: Event<P>) => Promise<boolean>, + msg: Message, + ): void { + try { + const message: EventBusMessage<Event<P>> = JSON.parse(msg.content.toString()); + + handler(message.event).then(isOk => { + if (isOk) { + // Ack + rabbitChannel.ack(msg); + } else { + // Nack + logger.warn('eventHandlerFailure', message.meta, message.event); + rabbitChannel.nack(msg, false, true); + } + }); + } catch (e) { + rabbitChannel.nack(msg, false, false); + logger.warn('Unable to parse message content to JSON', msg.content.toString(), e.toString()); + } + } + + private async setupExchanges( + eventDefs: string[], + subscriptions: Array<Subscription<unknown & object>>, + ): Promise<void> { + // Setup the exchanges + + const rabbitChannel = await this.connection.createChannel(); + this.subscriptions = []; + + await Promise.all( + eventDefs.map(async (eventType: string) => + rabbitChannel.assertExchange(EventUtils.makeEventExchangeName(eventType), 'fanout'), + ), + ) + .catch(() => logger.fatal("can't create exchanges")) + .then(() => { + this.connected(); + }); + + // Create subscribers here + subscriptions.forEach(async subscription => { + // subscribe + await this.subscribe(subscription.eventType, subscription.handler); + }); + } + + private async connect(rabbitUrl: string): Promise<Connection> { + try { + const connection = await amqplib.connect(rabbitUrl); + connection.on('error', () => this.disconnected()); + connection.on('end', () => this.disconnected()); + connection.on('close', () => this.disconnected()); + + return connection; + } catch (e) { + throw new Error('Connection failed'); + } + } private disconnected(): void { if (!this.destroyed) { this.externalConnector.send({ newState: 'NOT_CONNECTED' }); diff --git a/src/rabbit-event-bus/event-utils.test.ts b/src/rabbit-event-bus/event-utils.test.ts index 6b1c6ac40a280f38c487e330333c23ba4cb510c7..295f0ab32997f3d11282c62e3f6d70f813f4c78a 100644 --- a/src/rabbit-event-bus/event-utils.test.ts +++ b/src/rabbit-event-bus/event-utils.test.ts @@ -6,14 +6,16 @@ describe('EventUtils', () => { describe('eventTypeToExchange', (): void => { it('correctly names an event exchange', () => { const exampleDefinition = 'SampleEvent'; - expect(EventUtils.eventTypeToExchange(exampleDefinition)).toEqual('event__SampleEvent'); + expect(EventUtils.makeEventExchangeName(exampleDefinition)).toEqual('event__SampleEvent'); }); }); describe('eventTypeToQueue', () => { it('correctly names an event queue', () => { const eventType = 'SampleEventType'; const service = 'SampleService'; - expect(EventUtils.eventTypeToQueue(eventType, service)).toEqual('consumer__SampleEventType__SampleService'); + expect(EventUtils.makeConsumerQueueName(eventType, service)).toEqual( + 'consumer__SampleEventType__SampleService', + ); }); }); describe('eventToMessage', () => { diff --git a/src/rabbit-event-bus/event-utils.ts b/src/rabbit-event-bus/event-utils.ts index 658b5e5e57c6703c681a82f7139481cbf17983ed..fedbf0d817a5dd5731a5ffece1959d8f710f3e22 100644 --- a/src/rabbit-event-bus/event-utils.ts +++ b/src/rabbit-event-bus/event-utils.ts @@ -3,11 +3,11 @@ import { Message } from './types'; export class EventUtils { // Maybe merge this class with AMQPConnector - public static eventTypeToExchange(eventType: string): string { + public static makeEventExchangeName(eventType: string): string { return `event__${eventType}`; } - public static eventTypeToQueue(eventType: string, serviceName: string): string { + public static makeConsumerQueueName(eventType: string, serviceName: string): string { return `consumer__${eventType}__${serviceName}`; } diff --git a/src/rabbit-event-bus/index.test.ts b/src/rabbit-event-bus/index.test.ts index cbb72178b16cee0b9eab5b66dc53dc97391fd90a..cf8124b5c2dc1a042ec930e4149b6ad68af1c34a 100644 --- a/src/rabbit-event-bus/index.test.ts +++ b/src/rabbit-event-bus/index.test.ts @@ -8,12 +8,11 @@ jest.mock('../logger'); jest.mock('./amqp-connector'); describe('AMQP Connection Manager', () => { - describe('destory', () => { + describe('destroy', () => { it('calls connector destroy method', async () => { const destroyMock = jest.fn(); - // eslint-disable-next-line @typescript-eslint/no-unused-vars (AMQPConnector as jest.Mock).mockImplementation(() => ({ destroy: destroyMock })); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); await manager.destroy(); @@ -24,8 +23,7 @@ describe('AMQP Connection Manager', () => { describe('behaviour in a good connection state', () => { it('forwards messages to a connector', async () => { const publishMock = jest.fn(async () => true); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (AMQPConnector as jest.Mock).mockImplementation((__, [send, _]: Channel<StateChange>) => { + (AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>) => { send({ newState: 'CONNECTED', }); @@ -34,7 +32,7 @@ describe('AMQP Connection Manager', () => { subscribe: jest.fn(), }; }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); await manager.publish({ eventType: 'test', id: 'something', @@ -48,8 +46,7 @@ describe('AMQP Connection Manager', () => { it("passes on subscribes to the connector immediately, while it's ready", async () => { const subscribeMock = jest.fn(); const [readyNotify, readyWait] = channel<{}>(); - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (AMQPConnector as jest.Mock).mockImplementation((__, [send, _]: Channel<StateChange>) => { + (AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>) => { send({ newState: 'CONNECTED', }); @@ -61,7 +58,7 @@ describe('AMQP Connection Manager', () => { }; }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); await manager.subscribe('test', jest.fn()); @@ -75,26 +72,23 @@ describe('AMQP Connection Manager', () => { // This channel is used to simulate startup delay in the connector const [readyNotify, readyWait] = channel<{}>(); - (AMQPConnector as jest.Mock).mockImplementation( - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (___, [send, _]: Channel<StateChange>, __, subscriptions) => { + (AMQPConnector as jest.Mock).mockImplementation((___, [send]: Channel<StateChange>, __, subscriptions) => { + send({ + newState: 'CONNECTED', + }); + readyWait().then(() => { send({ - newState: 'CONNECTED', - }); - readyWait().then(() => { - send({ - newState: 'NOT_CONNECTED', - }); + newState: 'NOT_CONNECTED', }); - return { - subscriptions, - publish: publishMock, - subscribe: jest.fn(), - }; - }, - ); + }); + return { + subscriptions, + publish: publishMock, + subscribe: jest.fn(), + }; + }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); Promise.all([ manager.publish({ @@ -135,42 +129,38 @@ describe('AMQP Connection Manager', () => { // This channel is used to simulate startup delay in the connector const [readyNotify, readyWait] = channel<{}>(); - (AMQPConnector as jest.Mock).mockImplementation( - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (_0, [send, _1]: Channel<StateChange>, _2, subscriptions) => { + (AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => { + send({ + newState: 'CONNECTED', + }); + readyWait().then(() => { send({ - newState: 'CONNECTED', - }); - readyWait().then(() => { - send({ - newState: 'NOT_CONNECTED', - }); + newState: 'NOT_CONNECTED', }); - return { - subscriptions, - connect: connectMock, - publish: jest.fn(), - subscribe: subscribeMock, - }; - }, - ); + }); + return { + subscriptions, + connect: connectMock, + publish: jest.fn(), + subscribe: subscribeMock, + }; + }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); await manager.subscribe('test', jest.fn()); await manager.subscribe('test', jest.fn()); await manager.subscribe('test', jest.fn()); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - expect((manager as any).subscriptions.length).toEqual(3); + expect(manager.subscriptions.length).toEqual(3); expect(subscribeMock).toBeCalledTimes(3); // simulate some startup delay in the connector setTimeout(() => { readyNotify({}); // Expect the connector to be created with subscriptions - // eslint-disable-next-line @typescript-eslint/no-explicit-any - expect((manager as any).connector.get().subscriptions.length).toEqual(3); + expect(manager.subscriptions).toHaveLength(3); + expect(manager.connector.isEmpty()).toBeFalsy(); done(); }, 50); }); @@ -197,7 +187,7 @@ describe('AMQP Connection Manager', () => { }, ); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); const then = jest.fn(); manager @@ -236,22 +226,19 @@ describe('AMQP Connection Manager', () => { it('publish promises are resolved after a successful connection', async done => { const subscribeMock = jest.fn(); - (AMQPConnector as jest.Mock).mockImplementation( - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (_0, [send, _1]: Channel<StateChange>, _2, subscriptions) => { - send({ - newState: 'CONNECTED', - }); + (AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => { + send({ + newState: 'CONNECTED', + }); - return { - subscriptions, - publish: jest.fn(() => true), - subscribe: subscribeMock, - }; - }, - ); + return { + subscriptions, + publish: jest.fn(() => true), + subscribe: subscribeMock, + }; + }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); const then = jest.fn(); manager @@ -293,23 +280,20 @@ describe('AMQP Connection Manager', () => { let returnState: ConnectedState = 'NOT_CONNECTED'; let returnPublish = false; - (AMQPConnector as jest.Mock).mockImplementation( - // eslint-disable-next-line @typescript-eslint/no-unused-vars - (_0, [send, _1]: Channel<StateChange>, _2, subscriptions) => { - send({ - newState: returnState, - }); + (AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => { + send({ + newState: returnState, + }); - return { - subscriptions, - connect: connectMock, - publish: jest.fn(() => returnPublish), - subscribe: subscribeMock, - }; - }, - ); + return { + subscriptions, + connect: connectMock, + publish: jest.fn(() => returnPublish), + subscribe: subscribeMock, + }; + }); - const manager = await new RabbitEventBus({ url: '' }).init([], ''); + const manager = await new RabbitEventBus({ url: '' }, [], ''); const then = jest.fn(); manager diff --git a/src/rabbit-event-bus/index.ts b/src/rabbit-event-bus/index.ts index 8619cb9e9ae577a8c474f0a5405d622c209a77bb..d3757808b81b495113ee388b5c5557c43c43340a 100644 --- a/src/rabbit-event-bus/index.ts +++ b/src/rabbit-event-bus/index.ts @@ -1,5 +1,5 @@ import { Option, None, Some } from 'funfix'; -import { Event, EventBus } from '../event-bus'; +import { Event, EventBus, EventType, EventPublisher, EventSubscriber } from '../event-bus'; import { Subscription } from './types'; import AMQPConnector from './amqp-connector'; import { InternalMessageQueue, QueuedEvent } from './internal-queue'; @@ -19,27 +19,31 @@ export interface RabbitEventBusConnectionOptions { * @class RabbitEventBus * @implements {EventBus} */ -export default class RabbitEventBus implements EventBus, ConnectionOwner { - private connector: Option<AMQPConnector> = None; +export default class RabbitEventBus extends EventBus implements EventPublisher, EventSubscriber, ConnectionOwner { + private _connector: Option<AMQPConnector> = None; private connection: ConnectionObserver; - private eventDefinitions: string[]; - private serviceName = 'unknown-service'; private url = ''; private queue: InternalMessageQueue; - private subscriptions: Array<Subscription<unknown & object>> = []; + private _subscriptions: Array<Subscription<unknown & object>> = []; - public constructor(connectionOpts: RabbitEventBusConnectionOptions) { + public constructor( + connectionOpts: RabbitEventBusConnectionOptions, + eventToHandle: EventType[], + serviceName: string, + ) { + super(eventToHandle, serviceName); this.url = connectionOpts.url; - } - - // Can the params here be moved to the constructor? - public async init(eventDefinitions: string[], serviceName: string): Promise<this> { - this.eventDefinitions = eventDefinitions; - this.serviceName = serviceName; this.queue = new InternalMessageQueue(this); this.connection = new ConnectionObserver(this); this.connect(); - return this; + } + + public get connector(): Option<AMQPConnector> { + return this._connector; + } + + public get subscriptions(): Array<Subscription<unknown & object>> { + return this._subscriptions; } public async destroy(): Promise<void> { @@ -53,7 +57,7 @@ export default class RabbitEventBus implements EventBus, ConnectionOwner { } public onDisconnect(): void { - this.connector = None; + this._connector = None; } public onStartReconnect(): void { @@ -62,11 +66,11 @@ export default class RabbitEventBus implements EventBus, ConnectionOwner { } private connect(): void { - this.connector = Some( + this._connector = Some( new AMQPConnector( this.url, this.connection.channel, - this.eventDefinitions, + this.eventsToHandle, this.subscriptions, this.serviceName, ), diff --git a/yarn.lock b/yarn.lock index 9f4be20abd8df76f02e7ef52d977246af5d33de1..b7e0f1fc7c945a7a2f8fcb7c3c5f0e179e08019a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -394,10 +394,10 @@ resolved "https://registry.yarnpkg.com/@types/pino-std-serializers/-/pino-std-serializers-2.4.0.tgz#8cad99175cb79c2448f7455a2d32fb3fde29579c" integrity sha512-eAdu+NW1IkCdmp85SnhyKha+OOREQMT9lXaoICQxa7bhSauRiLzu3WSNt9Mf2piuJvWeXF/G0hGWHr63xNpIRA== -"@types/pino@^5.8.10": - version "5.14.0" - resolved "https://registry.yarnpkg.com/@types/pino/-/pino-5.14.0.tgz#e5226bf6d5da52e2d6b641309abee5f30b2d7a56" - integrity sha512-i4jTU0G9HL/wy7WbVMqcM/5Z6cFzbcLN6/T46rqQauIoiDqcEySR7SN4FfJvtv+RoBtJkQjDneq0Ytrj95cQHg== +"@types/pino@^5.14.0": + version "5.15.1" + resolved "https://registry.yarnpkg.com/@types/pino/-/pino-5.15.1.tgz#1d7bfdecbdcb64ec63a672be9bb9524b323f96e4" + integrity sha512-skJZ2VBHUva/dF4b2/3zOYVOZLyeaYT7wtPyf8+aoh0VuqfW8JO7PK9oJY8yRZVkN4K7RDdA5UFuH0QbbENZaw== dependencies: "@types/node" "*" "@types/pino-std-serializers" "*"