From ff4ebaf5174ee8723e9f4a791674161ff780c54c Mon Sep 17 00:00:00 2001 From: Alex Lay-Calvert Date: Thu, 9 Jan 2025 10:26:53 -0500 Subject: [PATCH 1/3] based onaftercommit hook for mongodb event store --- .../src/eventStore/mongoDBEventStore.ts | 21 +++++++++++++++---- .../emmett/src/eventStore/eventStore.ts | 7 ++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts index e9cc9461..990ce1d0 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts @@ -1,7 +1,8 @@ import { - assertExpectedVersionMatchesCurrent, ExpectedVersionConflictError, + assertExpectedVersionMatchesCurrent, filterProjections, + tryPublishMessagesAfterCommit, type AggregateStreamOptions, type AggregateStreamResult, type AppendToStreamOptions, @@ -14,6 +15,7 @@ import { type ReadEventMetadataWithoutGlobalPosition, type ReadStreamOptions, type ReadStreamResult, + type DefaultEventStoreOptions, } from '@event-driven-io/emmett'; import { MongoClient, @@ -178,7 +180,8 @@ export type MongoDBEventStoreOptions = { MongoDBProjectionInlineHandlerContext >[]; storage?: MongoDBEventStoreStorageOptions; -} & MongoDBEventStoreConnectionOptions; +} & MongoDBEventStoreConnectionOptions & + DefaultEventStoreOptions; export type MongoDBEventStore = EventStore & { projections: ProjectionQueries; @@ -189,13 +192,15 @@ export type MongoDBEventStore = EventStore & { class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable { private readonly client: MongoClient; - private shouldManageClientLifetime: boolean; private readonly inlineProjections: MongoDBInlineProjectionDefinition[]; + private shouldManageClientLifetime: boolean; private isClosed: boolean = false; - public projections: ProjectionQueries; private storage: MongoDBEventStoreStorage; + private options: MongoDBEventStoreOptions; + public projections: ProjectionQueries; constructor(options: MongoDBEventStoreOptions) { + this.options = options; this.client = 'client' in options && options.client ? options.client @@ -388,6 +393,14 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable { ); } + await tryPublishMessagesAfterCommit( + eventsToAppend, + this.options.hooks, + // { + // TODO: same context as InlineProjectionHandlerContext for mongodb? + // }, + ); + return { nextExpectedStreamVersion: currentStreamVersion + BigInt(eventsToAppend.length), diff --git a/src/packages/emmett/src/eventStore/eventStore.ts b/src/packages/emmett/src/eventStore/eventStore.ts index 26e571fd..4919337f 100644 --- a/src/packages/emmett/src/eventStore/eventStore.ts +++ b/src/packages/emmett/src/eventStore/eventStore.ts @@ -7,6 +7,7 @@ import type { GlobalPositionTypeOfReadEventMetadata, ReadEvent, ReadEventMetadata, + ReadEventMetadataWithoutGlobalPosition, StreamPositionTypeOfReadEventMetadata, } from '../typing'; import type { AfterEventStoreCommitHandler } from './afterCommit'; @@ -55,7 +56,11 @@ export type EventStoreReadEventMetadata = Store extends EventStore ? ReadEventMetadataType extends ReadEventMetadata ? ReadEventMetadata & ReadEventMetadataType - : never + : ReadEventMetadataType extends ReadEventMetadataWithoutGlobalPosition< + infer SV + > + ? ReadEventMetadata + : never : never; export type GlobalPositionTypeOfEventStore = From 5d89d83a36b7ae5a42b400a2a9747cbcbb850433 Mon Sep 17 00:00:00 2001 From: Alex Lay-Calvert Date: Thu, 9 Jan 2025 10:36:35 -0500 Subject: [PATCH 2/3] reverted typing, added tests --- .../src/eventStore/mongoDBEventStore.ts | 1 + ...ngoDBEventstore.onAfterCommit.unit.spec.ts | 153 ++++++++++++++++++ .../emmett/src/eventStore/eventStore.ts | 7 +- 3 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts index 990ce1d0..a3b64766 100644 --- a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts @@ -394,6 +394,7 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable { } await tryPublishMessagesAfterCommit( + // @ts-expect-error Issues with `globalPosition` not being present causing the type for metadata to expect `never` eventsToAppend, this.options.hooks, // { diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts new file mode 100644 index 00000000..179e1332 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts @@ -0,0 +1,153 @@ +import { after, before, describe, it } from 'node:test'; +import { v7 as uuid } from 'uuid'; +import { type Event, assertEqual } from '@event-driven-io/emmett'; +import { + getMongoDBEventStore, + type MongoDBReadEvent, +} from './mongoDBEventStore'; +import { + MongoDBContainer, + StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient } from 'mongodb'; + +type TestEvent = Event<'test', { counter: number }, { some: boolean }>; + +void describe('MongoDBEventStore onAfterCommit', () => { + let mongodb: StartedMongoDBContainer; + let client: MongoClient; + + before(async () => { + mongodb = await new MongoDBContainer().start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + }); + + after(async () => { + try { + await client.close(); + await mongodb.stop(); + } catch (error) { + console.log(error); + } + }); + + void it('calls onAfterCommit hook after events append', async () => { + // Given + const appendedEvents: MongoDBReadEvent[] = []; + const eventStore = getMongoDBEventStore({ + client, + hooks: { + onAfterCommit: (events) => { + appendedEvents.push(...events); + }, + }, + }); + const streamName = `test:${uuid()}`; + let counter = 0; + const events: TestEvent[] = [ + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: true }, + }, + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: false }, + }, + ]; + + // When + await eventStore.appendToStream(streamName, events); + + // Then + assertEqual(2, appendedEvents.length); + }); + + void it('calls onAfterCommit hook exactly once for each events append', async () => { + // Given + const appendedEvents: MongoDBReadEvent[] = []; + const eventStore = getMongoDBEventStore({ + client, + hooks: { + onAfterCommit: (events) => { + appendedEvents.push(...events); + }, + }, + }); + const streamName = `test:${uuid()}`; + let counter = 0; + const events: TestEvent[] = [ + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: true }, + }, + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: false }, + }, + ]; + const nextEvents: TestEvent[] = [ + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: true }, + }, + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: false }, + }, + ]; + + // When + await eventStore.appendToStream(streamName, events); + await eventStore.appendToStream(streamName, nextEvents); + + // Then + assertEqual(4, appendedEvents.length); + }); + + void it('silently fails when onAfterCommit hook failed but still keeps events', async () => { + // Given + const appendedEvents: MongoDBReadEvent[] = []; + const eventStore = getMongoDBEventStore({ + client, + hooks: { + onAfterCommit: (events) => { + appendedEvents.push(...events); + throw new Error('onAfterCommit failed!'); + }, + }, + }); + + const streamName = `test:${uuid()}`; + let counter = 0; + const events: TestEvent[] = [ + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: true }, + }, + { + type: 'test', + data: { counter: ++counter }, + metadata: { some: false }, + }, + ]; + + // When + await eventStore.appendToStream(streamName, events); + + // Then + assertEqual(2, appendedEvents.length); + const { events: eventsInStore } = await eventStore.readStream(streamName); + assertEqual(2, eventsInStore.length); + }); +}); diff --git a/src/packages/emmett/src/eventStore/eventStore.ts b/src/packages/emmett/src/eventStore/eventStore.ts index 4919337f..26e571fd 100644 --- a/src/packages/emmett/src/eventStore/eventStore.ts +++ b/src/packages/emmett/src/eventStore/eventStore.ts @@ -7,7 +7,6 @@ import type { GlobalPositionTypeOfReadEventMetadata, ReadEvent, ReadEventMetadata, - ReadEventMetadataWithoutGlobalPosition, StreamPositionTypeOfReadEventMetadata, } from '../typing'; import type { AfterEventStoreCommitHandler } from './afterCommit'; @@ -56,11 +55,7 @@ export type EventStoreReadEventMetadata = Store extends EventStore ? ReadEventMetadataType extends ReadEventMetadata ? ReadEventMetadata & ReadEventMetadataType - : ReadEventMetadataType extends ReadEventMetadataWithoutGlobalPosition< - infer SV - > - ? ReadEventMetadata - : never + : never : never; export type GlobalPositionTypeOfEventStore = From fec025135d10ba1e6de1dd05b1791bb448f6b7e8 Mon Sep 17 00:00:00 2001 From: Alex Lay-Calvert Date: Thu, 9 Jan 2025 10:38:45 -0500 Subject: [PATCH 3/3] change named to e2e test instead of unit --- ...t.unit.spec.ts => mongoDBEventstore.onAfterCommit.e2e.spec.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/packages/emmett-mongodb/src/eventStore/{mongoDBEventstore.onAfterCommit.unit.spec.ts => mongoDBEventstore.onAfterCommit.e2e.spec.ts} (100%) diff --git a/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts b/src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts similarity index 100% rename from src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.unit.spec.ts rename to src/packages/emmett-mongodb/src/eventStore/mongoDBEventstore.onAfterCommit.e2e.spec.ts