Skip to content

Commit

Permalink
Merge pull request #342 from Imanghvs/master
Browse files Browse the repository at this point in the history
increase test coverage for reconnection
  • Loading branch information
w1am authored Oct 20, 2023
2 parents 77eb8f5 + acf4899 commit 3027e8e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 32 deletions.
157 changes: 130 additions & 27 deletions src/__test__/connection/reconnect/all-nodes-down.test.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import { createTestCluster, delay } from "@test-utils";
import { collect, createTestCluster, delay } from "@test-utils";
import {
jsonEvent,
EventStoreDBClient,
UnavailableError,
persistentSubscriptionToStreamSettingsFromDefaults,
StreamNotFoundError,
} from "@eventstore/db-client";

// This test can take time.
jest.setTimeout(120_000);

const STREAM_NAME = "my_stream";

describe("reconnect", () => {
test("All nodes down", async () => {
const cluster = createTestCluster();
const cluster = createTestCluster();
let client!: EventStoreDBClient;

beforeAll(async () => {
await cluster.up();

const client = new EventStoreDBClient(
client = new EventStoreDBClient(
{
endpoints: cluster.endpoints,
// The timing of this test can be a bit variable,
Expand All @@ -24,48 +29,126 @@ describe("reconnect", () => {
{ rootCertificate: cluster.rootCertificate },
{ username: "admin", password: "changeit" }
);
});

afterAll(async () => {
await cluster.down();
});

test("All nodes down", async () => {
// make successful append to connect to node
const firstAppend = await client.appendToStream(
"my_stream",
STREAM_NAME,
jsonEvent({ type: "first-append", data: { message: "test" } }),
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
{ credentials: { username: "admin", password: "changeit" } }
);
expect(firstAppend).toBeDefined();

// read the stream successfully
const firstReadStream = await collect(
client.readStream(STREAM_NAME, { maxCount: 10 })
);
expect(firstReadStream.length).toBe(1);
const firstEvent = firstReadStream[0].event;
expect(firstEvent?.data).toStrictEqual({ message: "test" });
expect(firstEvent?.type).toBe("first-append");

// make successfull subscription to stream
const firstCreateSubscription =
await client.createPersistentSubscriptionToStream(
STREAM_NAME,
"first-test-group",
persistentSubscriptionToStreamSettingsFromDefaults()
);
expect(firstCreateSubscription).toBeUndefined();

// delete the stream successfully
const firstDeleteStream = await client.deleteStream(STREAM_NAME);
expect(firstDeleteStream).toBeDefined();
await expect(
collect(client.readStream(STREAM_NAME, { maxCount: 10 }))
).rejects.toThrowError(StreamNotFoundError);

// Kill all nodes
for (const endpoint of cluster.endpoints) {
await cluster.killNode(endpoint);
}

// next append should fail
try {
const secondAppend = await client.appendToStream(
"my_stream",
// next client operations should fail

// append to stream
await expect(
client.appendToStream(
STREAM_NAME,
jsonEvent({ type: "failed-append", data: { message: "test" } }),
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
{ credentials: { username: "admin", password: "changeit" } }
);
expect(secondAppend).toBe("Unreachable");
} catch (error) {
expect(error).toBeInstanceOf(UnavailableError);
}
)
).rejects.toThrowError(UnavailableError);
// read the stream
await expect(async () => {
let count = 0;
for await (const e of client.readStream(STREAM_NAME, { maxCount: 10 })) {
count++;
}
}).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);
// create subsctiption
await expect(
client.createPersistentSubscriptionToStream(
STREAM_NAME,
"second-test-group",
persistentSubscriptionToStreamSettingsFromDefaults()
)
).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);
// delete stream
await expect(
client.deleteStream(STREAM_NAME)
).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);

// next append should also fail, as there is nothing to reconnect to reconnection fail
try {
const secondAppend = await client.appendToStream(
"my_stream",
// next operations should also fail, as there is nothing to reconnect to reconnection fail
// append to stream
await expect(
client.appendToStream(
STREAM_NAME,
jsonEvent({ type: "failed-append", data: { message: "test" } }),
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
{ credentials: { username: "admin", password: "changeit" } }
);
expect(secondAppend).toBe("Unreachable");
} catch (error) {
expect(error).toMatchInlineSnapshot(
`[Error: Failed to discover after 10 attempts.]`
);
}
)
).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);
// read the stream
await expect(async () => {
let count = 0;
for await (const e of client.readStream(STREAM_NAME, { maxCount: 10 })) {
count++;
}
}).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);
// create subsctiption
await expect(
client.createPersistentSubscriptionToStream(
STREAM_NAME,
"third-test-group",
persistentSubscriptionToStreamSettingsFromDefaults()
)
).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);
// delete stream
await expect(
client.deleteStream(STREAM_NAME)
).rejects.toThrowErrorMatchingInlineSnapshot(
'"Failed to discover after 10 attempts."'
);

// resurrect all nodes
await cluster.resurrect();
Expand All @@ -74,13 +157,33 @@ describe("reconnect", () => {
await delay(5000);

const reconnectedAppend = await client.appendToStream(
"my_stream",
STREAM_NAME,
jsonEvent({ type: "reconnect-append", data: { message: "test" } }),
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
{ credentials: { username: "admin", password: "changeit" } }
);
expect(reconnectedAppend).toBeDefined();

await cluster.down();
const reconnectReadStream = await collect(
client.readStream(STREAM_NAME, { maxCount: 10 })
);
expect(reconnectReadStream.length).toBe(1);
const reconnectEvent = reconnectReadStream[0].event;
expect(reconnectEvent?.data).toStrictEqual({ message: "test" });
expect(reconnectEvent?.type).toBe("reconnect-append");

const reconndectedCreateSubscription =
await client.createPersistentSubscriptionToStream(
STREAM_NAME,
"fourth-test-group",
persistentSubscriptionToStreamSettingsFromDefaults()
);
expect(reconndectedCreateSubscription).toBeUndefined();

const reconnectedDeleteStream = await client.deleteStream(STREAM_NAME);
expect(reconnectedDeleteStream).toBeDefined();
await expect(
collect(client.readStream(STREAM_NAME, { maxCount: 10 }))
).rejects.toThrowError(StreamNotFoundError);
});
});
14 changes: 9 additions & 5 deletions src/__test__/connection/reconnect/no-reconnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
EventStoreDBClient,
StreamNotFoundError,
TimeoutError,
WrongExpectedVersionError,
} from "@eventstore/db-client";

// This test can take time.
Expand Down Expand Up @@ -38,14 +39,17 @@ describe("reconnect", () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const priorChannel = await (client as any).getChannel();

try {
// attempt to read a stream that doesnt exist
// attempt to read a stream that doesn't exist should fail
await expect(async () => {
for await (const event of client.readStream("doesn't-exist")) {
expect(event).toBe("unreachable");
}
} catch (error) {
expect(error).toBeInstanceOf(StreamNotFoundError);
}
}).rejects.toThrowError(StreamNotFoundError);

// attempt to delete a stream that doesnt exist should fail
await expect(client.deleteStream("doesn't-exist")).rejects.toThrowError(
WrongExpectedVersionError
);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const afterChannel = await (client as any).getChannel();
Expand Down

0 comments on commit 3027e8e

Please sign in to comment.