Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial #169 - MongoDB EventStore onAfterCommit hook #170

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {
assertExpectedVersionMatchesCurrent,
ExpectedVersionConflictError,
assertExpectedVersionMatchesCurrent,
filterProjections,
tryPublishMessagesAfterCommit,
type AggregateStreamOptions,
type AggregateStreamResult,
type AppendToStreamOptions,
Expand All @@ -14,6 +15,7 @@ import {
type ReadEventMetadataWithoutGlobalPosition,
type ReadStreamOptions,
type ReadStreamResult,
type DefaultEventStoreOptions,
} from '@event-driven-io/emmett';
import {
MongoClient,
Expand Down Expand Up @@ -178,7 +180,8 @@ export type MongoDBEventStoreOptions = {
MongoDBProjectionInlineHandlerContext
>[];
storage?: MongoDBEventStoreStorageOptions;
} & MongoDBEventStoreConnectionOptions;
} & MongoDBEventStoreConnectionOptions &
DefaultEventStoreOptions<MongoDBEventStore>;

export type MongoDBEventStore = EventStore<MongoDBReadEventMetadata> & {
projections: ProjectionQueries<StreamType>;
Expand All @@ -189,13 +192,15 @@ export type MongoDBEventStore = EventStore<MongoDBReadEventMetadata> & {

class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
private readonly client: MongoClient;
private shouldManageClientLifetime: boolean;
private readonly inlineProjections: MongoDBInlineProjectionDefinition[];
private shouldManageClientLifetime: boolean;
private isClosed: boolean = false;
public projections: ProjectionQueries<StreamType>;
private storage: MongoDBEventStoreStorage;
private options: MongoDBEventStoreOptions;
public projections: ProjectionQueries<StreamType>;

constructor(options: MongoDBEventStoreOptions) {
this.options = options;
this.client =
'client' in options && options.client
? options.client
Expand Down Expand Up @@ -388,6 +393,15 @@ class MongoDBEventStoreImplementation implements MongoDBEventStore, Closeable {
);
}

await tryPublishMessagesAfterCommit<MongoDBEventStore>(
// @ts-expect-error Issues with `globalPosition` not being present causing the type for metadata to expect `never`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alex-laycalvert, that's weird; I noticed that TS got crazy on my other computer, but when I reinstalled fresh packages, then it worked fine. I'm not sure if that's the same issue, but I'll check the typing after the merge 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know for sure but I wonder if it has to do with the infer GV usage not working if global position is not on the type itself, even if we allow undefined.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because technically, it's not undefined but optional. TypeScript is treating those cases differently. I'll have a look, it might be that I messed something out.

eventsToAppend,
this.options.hooks,
// {
// TODO: same context as InlineProjectionHandlerContext for mongodb?
// },
);

return {
nextExpectedStreamVersion:
currentStreamVersion + BigInt(eventsToAppend.length),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { after, before, describe, it } from 'node:test';
import { v7 as uuid } from 'uuid';
import { type Event, assertEqual } from '@event-driven-io/emmett';
import {
getMongoDBEventStore,
type MongoDBReadEvent,
} from './mongoDBEventStore';
import {
MongoDBContainer,
StartedMongoDBContainer,
} from '@testcontainers/mongodb';
import { MongoClient } from 'mongodb';

type TestEvent = Event<'test', { counter: number }, { some: boolean }>;

void describe('MongoDBEventStore onAfterCommit', () => {
let mongodb: StartedMongoDBContainer;
let client: MongoClient;

before(async () => {
mongodb = await new MongoDBContainer().start();
client = new MongoClient(mongodb.getConnectionString(), {
directConnection: true,
});

await client.connect();
});

after(async () => {
try {
await client.close();
await mongodb.stop();
} catch (error) {
console.log(error);
}
});

void it('calls onAfterCommit hook after events append', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
},
},
});
const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);

// Then
assertEqual(2, appendedEvents.length);
});

void it('calls onAfterCommit hook exactly once for each events append', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
},
},
});
const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];
const nextEvents: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);
await eventStore.appendToStream(streamName, nextEvents);

// Then
assertEqual(4, appendedEvents.length);
});

void it('silently fails when onAfterCommit hook failed but still keeps events', async () => {
// Given
const appendedEvents: MongoDBReadEvent[] = [];
const eventStore = getMongoDBEventStore({
client,
hooks: {
onAfterCommit: (events) => {
appendedEvents.push(...events);
throw new Error('onAfterCommit failed!');
},
},
});

const streamName = `test:${uuid()}`;
let counter = 0;
const events: TestEvent[] = [
{
type: 'test',
data: { counter: ++counter },
metadata: { some: true },
},
{
type: 'test',
data: { counter: ++counter },
metadata: { some: false },
},
];

// When
await eventStore.appendToStream(streamName, events);

// Then
assertEqual(2, appendedEvents.length);
const { events: eventsInStore } = await eventStore.readStream(streamName);
assertEqual(2, eventsInStore.length);
});
});
Loading