diff --git a/src/event-bus/event-bus.test.ts b/src/event-bus/event-bus.test.ts index 4f4a8d1efc07e083f85a873500679deac624746b..a8582b97a46721b51021be048d5b8cbbb256a18c 100644 --- a/src/event-bus/event-bus.test.ts +++ b/src/event-bus/event-bus.test.ts @@ -1,16 +1,26 @@ import { EventBus } from './event-bus'; describe('Event Bus class', () => { - it('EventBus is abstract', () => { - const create = (T): EventBus => new T(['test'], 'test'); + const create = (T): EventBus => new T(['test'], 'test'); + it('is abstract', () => { + const x = create(EventBus); + expect(() => x.destroy()).toThrow('x.destroy is not a function'); + }); + it('initialises members', () => { const x = create(EventBus); expect(x.eventsToHandle).toStrictEqual(['test']); expect(x.serviceName).toStrictEqual('test'); - expect(() => x.destroy()).toThrow('x.destroy is not a function'); }); - it('EventBus can be extended', () => { + it('has correct members', () => { + const keys = Object.keys(create(EventBus)); + expect(keys).toHaveLength(2); + expect(keys).toContain('eventsToHandle'); + expect(keys).toContain('serviceName'); + }); + + it('can be extended', () => { class EB extends EventBus { called = false; destroy(): Promise<void> { diff --git a/src/event-bus/event-factory.test.ts b/src/event-bus/event-factory.test.ts deleted file mode 100644 index 27e12197b6ff11d5185fb10bbfc585d247087979..0000000000000000000000000000000000000000 --- a/src/event-bus/event-factory.test.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { eventFactory, eventAbstractFactory } from './event-factory'; -import { Event, Payload } from './types'; - -describe('Event Creation', () => { - describe('eventFactory', () => { - it('creates an Event', () => { - const payload = new Payload(); - const event = eventFactory('type', payload); - expect(event.id).toHaveLength(36); - expect(event.eventType).toBe('type'); - expect(event.created instanceof Date).toBe(true); - expect(event.payload).toBe(payload); - }); - - it('fails to create an Event without a payload of type Payload', () => { - expect(() => eventFactory('type', {})).toThrow('argument payload is not of type Payload!'); - }); - }); - - describe('eventAbstractFactory', () => { - class Party extends Payload { - beer: boolean; - address: string; - } - class EventParty extends Event {} - - class GardenParty extends Payload { - wine: boolean; - } - - it('creates an Event of correct type', () => { - const payload = new Party(); - payload.beer = true; - payload.address = 'my place'; - - const event: EventParty = eventAbstractFactory('party', Party, payload); - - expect(event.id).toHaveLength(36); - expect(event.eventType).toBe('party'); - expect(event.created instanceof Date).toBe(true); - expect(event.payload).toHaveProperty('beer'); - expect(event.payload['beer']).toBe(true); - expect(event.payload).toHaveProperty('address'); - expect(event.payload['address']).toBe('my place'); - }); - - it('rejects arbitrary payload type', () => { - const payload = { address: 'my place' }; - const create = (p: object): EventParty => eventAbstractFactory('party', Party, p); - - expect(() => create(payload)).toThrow('expected payload of type Party got Object'); - }); - - it('rejects incorrect payload type', () => { - const payload = new GardenParty(); - payload.wine = true; - const create = (p: GardenParty): EventParty => eventAbstractFactory('party', Party, p); - - expect(() => create(payload)).toThrow('expected payload of type Party got GardenParty'); - }); - }); -}); diff --git a/src/event-bus/event-factory.ts b/src/event-bus/event-factory.ts deleted file mode 100644 index efe410c49b709b8b08532390d70c6f9567d45720..0000000000000000000000000000000000000000 --- a/src/event-bus/event-factory.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Event, Payload } from './types'; -import uuid = require('uuid'); - -export const eventFactory = (eventType: string, payload: Payload): Event => { - if (!(payload instanceof Payload)) { - throw new TypeError('argument payload is not of type Payload!'); - } - return { - eventType, - id: uuid.v4(), - created: new Date(), - payload, - }; -}; - -export const eventAbstractFactory = (typeName: string, payloadClass, p: object): Event => { - if (payloadClass.name !== p.constructor.name) { - throw new TypeError(`expected payload of type ${payloadClass.name} got ${p.constructor.name}`); - } - const event = eventFactory(typeName, p); - return event; -}; diff --git a/src/event-bus/index.ts b/src/event-bus/index.ts index 999b47ef342e29d44a7577459c88ee32f8847952..041c6574ad4809a6209905641ed0131b8cbc9f65 100644 --- a/src/event-bus/index.ts +++ b/src/event-bus/index.ts @@ -1,3 +1,3 @@ export * from './types'; +export * from './event'; export * from './event-bus'; -export * from './event-factory'; diff --git a/src/event-bus/types.test.ts b/src/event-bus/types.test.ts index e507e82adc91fb08ec70f32841c196171965a3c8..d906159e79128657ea14797fe91c88c8e1291421 100644 --- a/src/event-bus/types.test.ts +++ b/src/event-bus/types.test.ts @@ -1,4 +1,5 @@ -import { EventType, Event, EventPublisher, EventSubscriber, EventConfig } from './types'; +import { EventType, EventPublisher, EventSubscriber, EventConfig } from './types'; +import { Event } from './event'; function createValidationFunction<T>(properties: Record<keyof T, boolean>): Function { return function<TActual extends T>(value: TActual): T { @@ -26,42 +27,23 @@ function createValidationFunction<T>(properties: Record<keyof T, boolean>): Func }; } -describe('Event Bus Types', () => { - it('type EventType is string', () => { +describe('EventType', () => { + it('is string', () => { const et: EventType = 'this is a string'; expect(typeof et).toBe('string'); }); +}); - it('interface Event contains expected members', () => { - const expectedEventProperties = { - eventType: true, - id: true, - created: true, - payload: true, - version: false, - context: false, - }; - - class ExpectedToBeAnEvent { - id = '234'; - created: Date = new Date(); - payload: {} = {}; - eventType = 'type'; - } - const isValidEvent = createValidationFunction<Event<{}>>(expectedEventProperties); - const ee = new ExpectedToBeAnEvent(); - expect(() => isValidEvent(ee)).not.toThrow(); - expect(() => ee as Event<{}>).not.toThrow(); - }); - - it('interface EventPublisher contains expected members', () => { +describe('EventPublisher', () => { + it('contains contains only publish', () => { const expectedEventPublisherProperties = { publish: true, }; class ExpectedToBeAnEventPublisher { - publish<T extends object>(event: Event<T>): Promise<boolean> { - return Promise.resolve(event !== null); + publish(event: Event): Promise<void> { + expect(event).toBeTruthy(); + return Promise.resolve(); } } const isValidEventPublisher = createValidationFunction<EventPublisher>(expectedEventPublisherProperties); @@ -69,14 +51,16 @@ describe('Event Bus Types', () => { expect(() => isValidEventPublisher(ep)).not.toThrow(); expect(() => ep as EventPublisher).not.toThrow(); }); +}); +describe('EventSubscription', () => { it('interface EventSubscriber contains expected members', () => { const expectedEventSubscriberProperties = { subscribe: true, }; class ExpectedToBeAnEventSubscriber { - subscribe<T extends object>(eventType: string, handler: (event: Event<T>) => Promise<boolean>): void { + subscribe(eventType: string, handler: (event: Event) => Promise<boolean>): void { Promise.resolve(eventType !== '' && handler !== null); } } @@ -85,8 +69,10 @@ describe('Event Bus Types', () => { expect(() => isValidEventSubscriber(es)).not.toThrow(); expect(() => es as EventSubscriber).not.toThrow(); }); +}); - it('interface EventConfig contains expected members', () => { +describe('EventConfig', () => { + it('contains only url', () => { const expectedEventConfigProperties = { url: true, }; diff --git a/src/event-bus/types.ts b/src/event-bus/types.ts index 10ace28347163e3847de0959326f99830fa1ff38..a07782b38fcda65153c0b49620ad04c4499a2ab6 100644 --- a/src/event-bus/types.ts +++ b/src/event-bus/types.ts @@ -1,27 +1,14 @@ // Abstract Message Queue - types and interfaces +import { Event } from './event'; export type EventType = string; -export class Payload {} - -export class Event { - readonly eventType: EventType; - readonly id: string; // Generated when the event is emitted - readonly created: Date; - // version: has been removed - so we can remain weakly typed - // context: has also been removed - if you need information about the origin - // source of the event then put it in the payload. - - constructor(readonly payload: Payload) {} -} - export interface EventPublisher { - // Promise<boolean> should this become void | exception? we only need to know if something went wrong - publish(event: Event): Promise<boolean>; + publish(event: Event): Promise<void>; } export interface EventSubscriber { - // handler: returns whether or not we should ack the message + // handler: returns whether or not the EventBus implementation should ack the message subscribe(eventType: string, handler: (event: Event) => Promise<boolean>): void; } diff --git a/src/mock-event-bus/index.test.ts b/src/mock-event-bus/index.test.ts index edded42223ca7b782225d3a3e1479082ec9bc7d0..1efe5000302904314e2c7cc36ef915e148f51ac1 100644 --- a/src/mock-event-bus/index.test.ts +++ b/src/mock-event-bus/index.test.ts @@ -1,88 +1,127 @@ import { MockEventBus } from './index'; -import { Event, Payload } from '../event-bus/types'; -import { eventFactory } from '../event-bus/event-factory'; - -describe('mock message queue', () => { - // describe('object lifetime', () => { - // it('can do the full flow', async () => { - // const x = 2; - // }); - // }); - - describe('you can publish and subscribe', () => { - it('can do the full flow', async () => { - const eventType = 'libero:mock:test'; - - const mockHandler = jest.fn(async () => true); - const mockEventBus = new MockEventBus([eventType], 'message-bus-test'); - - mockEventBus.subscribe(eventType, mockHandler); - const payload = new Payload(); - - const event = eventFactory(eventType, payload); - - mockEventBus.publish(event); - - expect(mockHandler).toBeCalled(); - expect(mockHandler.mock.calls).toEqual([[event]]); - }); - - it('can discriminate based on event type', async () => { - const eventType1 = 'libero:mock:test1'; - const eventType2 = 'libero:mock:test2'; - - const event1: Event = { - eventType: eventType1, - id: 'some-testing-event1-id', - created: new Date(), - payload: { x: 10, y: 20 }, - }; - - const event2: Event = { - eventType: eventType2, - id: 'some-testing-event2-id', +import { Event, EventType } from '../event-bus'; + +class TestEvent extends Event { + static eventName: EventType = 'libero:mock:test'; + constructor(payload: object) { + super(TestEvent.eventName, payload); + } +} +const mockHandler = jest.fn( + (): Promise<boolean> => { + return Promise.resolve(true); + }, +); + +describe('MockEventBus', () => { + it('can publish to many event types without handlers', () => { + const mockEventBus = new MockEventBus(['some-event'], 'message-bus-test'); + [...Array(100).keys()].map(key => { + const e: Event = { + eventType: 'event' + key.toString(), + id: '0', created: new Date(), - payload: { a: 10, b: 20 }, + payload: {}, }; + mockEventBus.publish(e); + }); + }); - let handler1 = 0; - let handler2 = 0; - let receivedEvent1: object = {}; - let receivedEvent2: object = {}; - - const mockHandler1 = async (event: Event): Promise<boolean> => { - handler1 += 1; - receivedEvent1 = event; - - return Promise.resolve(true); - }; + it('publish throws if no handler', async () => { + const mockEventBus = new MockEventBus([TestEvent.eventName], 'message-bus-test'); + const testEvent = new TestEvent({ x: 'test' }); + expect(mockEventBus.publish(testEvent)).rejects.toStrictEqual( + new Error('handler for libero:mock:test is undefined or not a function'), + ); + }); - const mockHandler2 = async (event: Event): Promise<boolean> => { - handler2 += 1; - receivedEvent2 = event; - return Promise.resolve(true); - }; + it('subscribe throws if no service name', async () => { + const mockEventBus = new MockEventBus([TestEvent.eventName], ''); + const testEvent = new TestEvent({ x: 'test' }); + expect(mockEventBus.subscribe(testEvent.eventType, mockHandler)).rejects.toStrictEqual( + new Error('Service name not set!'), + ); + }); - const mockEventBus = new MockEventBus([eventType1, eventType2], 'message-bus-test'); + it('subscribe throws if no event name', async () => { + const mockEventBus = new MockEventBus([TestEvent.eventName], 'service'); + expect(mockEventBus.subscribe('', mockHandler)).rejects.toStrictEqual(new Error('EventType name not set!')); + }); - mockEventBus.subscribe(eventType1, mockHandler1); - mockEventBus.subscribe(eventType2, mockHandler2); + it('subscribe throws if handler set twice', async () => { + const mockEventBus = new MockEventBus([TestEvent.eventName], 'service'); + expect(mockEventBus.subscribe('test', mockHandler)).resolves.not.toThrow(); + expect(mockEventBus.subscribe('test', mockHandler)).rejects.toStrictEqual( + new Error(`Handler already set for 'test' set!`), + ); + }); - mockEventBus.publish(event2); + it('can publish and subscribe to the same EventType', async () => { + const mockEventBus = new MockEventBus([TestEvent.eventName], 'message-bus-test'); - expect(handler1).toBe(0); - expect(handler2).toBe(1); - expect(receivedEvent1).toEqual({}); - expect(receivedEvent2).toEqual(event2); + await mockEventBus.subscribe(TestEvent.eventName, mockHandler); + const testEvent = new TestEvent({ x: 'test' }); + await mockEventBus.publish(testEvent); - receivedEvent1 = {}; - receivedEvent2 = {}; - mockEventBus.publish(event1); + expect(mockHandler).toBeCalled(); + expect(mockHandler.mock.calls).toEqual([[testEvent]]); + }); - expect(handler1).toBe(1); - expect(handler2).toBe(1); - expect(receivedEvent1).toEqual(event1); - expect(receivedEvent2).toEqual({}); - }); + it('can discriminate based on event type', async () => { + const eventType1 = 'libero:mock:test1'; + const eventType2 = 'libero:mock:test2'; + + const event1: Event = { + eventType: eventType1, + id: 'some-testing-event1-id', + created: new Date(), + payload: { x: 10, y: 20 }, + }; + + const event2: Event = { + eventType: eventType2, + id: 'some-testing-event2-id', + created: new Date(), + payload: { a: 10, b: 20 }, + }; + + let handler1 = 0; + let handler2 = 0; + let receivedEvent1: object = {}; + let receivedEvent2: object = {}; + + const mockHandler1 = async (event: Event): Promise<boolean> => { + handler1 += 1; + receivedEvent1 = event; + + return Promise.resolve(true); + }; + + const mockHandler2 = async (event: Event): Promise<boolean> => { + handler2 += 1; + receivedEvent2 = event; + return Promise.resolve(true); + }; + + const mockEventBus = new MockEventBus([eventType1, eventType2], 'message-bus-test'); + + mockEventBus.subscribe(eventType1, mockHandler1); + mockEventBus.subscribe(eventType2, mockHandler2); + + mockEventBus.publish(event2); + + expect(handler1).toBe(0); + expect(handler2).toBe(1); + expect(receivedEvent1).toEqual({}); + expect(receivedEvent2).toEqual(event2); + + receivedEvent1 = {}; + receivedEvent2 = {}; + mockEventBus.publish(event1); + + expect(handler1).toBe(1); + expect(handler2).toBe(1); + expect(receivedEvent1).toEqual(event1); + expect(receivedEvent2).toEqual({}); }); }); diff --git a/src/mock-event-bus/index.ts b/src/mock-event-bus/index.ts index 86d6b48b2db994eeda9c77dae158bf65fe22eac3..8bb15d441df3583c889246a7737ac2f04ed5dc78 100644 --- a/src/mock-event-bus/index.ts +++ b/src/mock-event-bus/index.ts @@ -10,21 +10,31 @@ export type AnyHandler = (ev: AnyEvent) => Promise<boolean>; export class MockEventBus extends EventBus implements EventPublisher, EventSubscriber { private queues: Map<string, AnyHandler> = new Map(); - public async publish(event: Event): Promise<boolean> { + public async publish(event: Event): Promise<void> { + if (!this.eventsToHandle.includes(event.eventType)) { + return; + } const fn = this.queues.get(`${event.eventType}`); - if (fn) { - if (this.eventsToHandle.includes(event.eventType)) { - return fn(event); - } + + if (fn !== undefined && typeof fn === 'function') { + fn(event); + } else { + throw new Error(`handler for ${event.eventType} is undefined or not a function`); } - return Promise.resolve(false); } public async subscribe(eventType: string, handler: (event: Event) => Promise<boolean>): Promise<void> { if (!this.serviceName) { - Promise.reject(`Service name not set!`); + throw new Error(`Service name not set!`); + } + if (!eventType) { + throw new Error(`EventType name not set!`); + } + const key = `${eventType}`; + if (this.queues.has(key)) { + throw new Error(`Handler already set for '${eventType}' set!`); } - this.queues.set(`${eventType}`, handler); + this.queues.set(key, handler); } destroy(): Promise<void> { diff --git a/src/rabbit-event-bus/amqp-connector.ts b/src/rabbit-event-bus/amqp-connector.ts index 84f648d3377e4e7859aecd2581e281e5431144ac..53d0f4aad75e1cdbc85e290f4c755ba982763781 100644 --- a/src/rabbit-event-bus/amqp-connector.ts +++ b/src/rabbit-event-bus/amqp-connector.ts @@ -20,7 +20,7 @@ export default class AMQPConnector { url: string, [sender]: Channel<StateChange>, eventDefs: string[], - subscriptions: Array<Subscription<unknown & object>>, + subscriptions: Array<Subscription>, serviceName: string, ) { this.externalConnector = { send: sender }; @@ -47,10 +47,7 @@ export default class AMQPConnector { } } - public async subscribe<P extends object>( - eventType: EventType, - handler: (ev: Event<P>) => Promise<boolean>, - ): Promise<void> { + public async subscribe(eventType: EventType, handler: (ev: Event) => Promise<boolean>): Promise<void> { // For the event identifier: // - Declare a subscriber queue // - bind that queue to event exchange @@ -71,7 +68,7 @@ export default class AMQPConnector { this.subscriptions.push(eventType); await rabbitChannel.consume(qName, async (msg: Message) => { - return this.decoratedHandler<P>(rabbitChannel, handler, msg); + return this.decoratedHandler(rabbitChannel, handler, msg); }); }) .catch(() => { @@ -84,7 +81,7 @@ export default class AMQPConnector { } } - public async publish<P extends object>(event: Event<P>): Promise<boolean> { + public async publish(event: Event): Promise<boolean> { // publish the message const whereTo = EventUtils.makeEventExchangeName(event.eventType); return Option.of(this.connection) @@ -123,13 +120,13 @@ export default class AMQPConnector { } } - private decoratedHandler<P extends object>( + private decoratedHandler( rabbitChannel: amqplib.Channel, - handler: (ev: Event<P>) => Promise<boolean>, + handler: (ev: Event) => Promise<boolean>, msg: Message, ): void { try { - const message: EventBusMessage<Event<P>> = JSON.parse(msg.content.toString()); + const message: EventBusMessage<Event> = JSON.parse(msg.content.toString()); handler(message.event).then(isOk => { if (isOk) { @@ -147,12 +144,7 @@ export default class AMQPConnector { } } - private async setupExchanges( - eventDefs: string[], - subscriptions: Array<Subscription<unknown & object>>, - ): Promise<void> { - // Setup the exchanges - + private async setupExchanges(eventDefs: string[], subscriptions: Array<Subscription>): Promise<void> { const rabbitChannel = await this.connection.createChannel(); this.subscriptions = []; diff --git a/src/rabbit-event-bus/index.ts b/src/rabbit-event-bus/index.ts index d3757808b81b495113ee388b5c5557c43c43340a..e224e321527657f31a6fbb1458e33555dbdb4ab2 100644 --- a/src/rabbit-event-bus/index.ts +++ b/src/rabbit-event-bus/index.ts @@ -24,7 +24,7 @@ export default class RabbitEventBus extends EventBus implements EventPublisher, private connection: ConnectionObserver; private url = ''; private queue: InternalMessageQueue; - private _subscriptions: Array<Subscription<unknown & object>> = []; + private _subscriptions: Array<Subscription> = []; public constructor( connectionOpts: RabbitEventBusConnectionOptions, @@ -42,7 +42,7 @@ export default class RabbitEventBus extends EventBus implements EventPublisher, return this._connector; } - public get subscriptions(): Array<Subscription<unknown & object>> { + public get subscriptions(): Array<Subscription> { return this._subscriptions; } @@ -79,17 +79,16 @@ export default class RabbitEventBus extends EventBus implements EventPublisher, // This method will not resolve until the event has been successfully published so that // the user never has to know about the internal queue - public publish<P extends object>(msg: Event<P>): Promise<boolean> { + public publish(msg: Event): Promise<void> { return new Promise(async (resolve, reject) => { if (this.connection.isConnected) { - // Should we queue messages that fail? const published: boolean = await this.connector.get().publish(msg); if (!published) { const qEvent: QueuedEvent = { event: msg, resolve, reject }; this.queue.push(qEvent); } else { - resolve(published); + resolve(); } } else { const qEvent: QueuedEvent = { event: msg, resolve, reject }; @@ -98,10 +97,7 @@ export default class RabbitEventBus extends EventBus implements EventPublisher, }); } - public async subscribe<P extends object>( - eventType: string, - handler: (event: Event<P>) => Promise<boolean>, - ): Promise<number> { + public async subscribe(eventType: string, handler: (event: Event) => Promise<boolean>): Promise<number> { this.connector.map(connector => { connector.subscribe(eventType, handler); }); diff --git a/src/rabbit-event-bus/internal-queue.ts b/src/rabbit-event-bus/internal-queue.ts index 8c4f521de307286f07fc0657e6ae061bfa098633..3be0ef1e7bcb69ec4ae374ef2047681bbe765a03 100644 --- a/src/rabbit-event-bus/internal-queue.ts +++ b/src/rabbit-event-bus/internal-queue.ts @@ -2,8 +2,8 @@ import { Event, EventPublisher } from '../event-bus'; import { Option } from 'funfix'; export interface QueuedEvent { - event: Event<unknown & object>; - resolve: (arg0: boolean) => void; + event: Event; + resolve: (arg0: void) => void; reject: (arg0: boolean) => void; } diff --git a/src/rabbit-event-bus/types.ts b/src/rabbit-event-bus/types.ts index ddffe7d3b5a7c482573567a187ba31eaee6b3fa4..6cb9210e3997a5f9a5003c95227e10dfe0534f5b 100644 --- a/src/rabbit-event-bus/types.ts +++ b/src/rabbit-event-bus/types.ts @@ -16,7 +16,7 @@ export interface Message<T> { }; } -export interface Subscription<P extends object> { +export interface Subscription { eventType: string; - handler: (ev: Event<P>) => Promise<boolean>; + handler: (ev: Event) => Promise<boolean>; }