Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing util to poll for async event testing #758

Merged
merged 14 commits into from
Jun 18, 2024
Merged
35 changes: 18 additions & 17 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

256 changes: 117 additions & 139 deletions tests/event-log/event-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import type { EventStream } from '../../src/index.js';
import type { KeyValues } from '../../src/types/query-types.js';
import type { MessageEvent } from '../../src/types/subscriptions.js';

import { Poller } from '../utils/poller.js';
import { TestEventStream } from '../test-event-stream.js';
import { TestTimingUtils } from '../utils/test-timing-utils.js';
import { Message, TestDataGenerator, Time } from '../../src/index.js';
import { Message, TestDataGenerator } from '../../src/index.js';

import sinon from 'sinon';

Expand All @@ -20,151 +20,129 @@ chai.use(chaiAsPromised);

// It is also important to note that in some cases where we are testing a negative case (the message not arriving at the subscriber)
// we add an alternate subscription to await results within to give the EventStream ample time to process the message.
// Additionally inn some of these cases the order in which messages are sent to be processed or checked may matter, and they are noted as such.
// Additionally in some of these cases the order in which messages are sent to be processed or checked may matter, and they are noted as such.

describe('EventStream', () => {
// saving the original `console.error` function to re-assign after tests complete
const originalConsoleErrorFunction = console.error;
let eventStream: EventStream;
export function testEventStream(): void {
describe('EventStream', () => {
// saving the original `console.error` function to re-assign after tests complete
const originalConsoleErrorFunction = console.error;
let eventStream: EventStream;

before(async () => {
eventStream = TestEventStream.get();
await eventStream.open();
before(async () => {
eventStream = TestEventStream.get();
await eventStream.open();

// do not print the console error statements from the emitter error
console.error = (_):void => { };
});

beforeEach(() => {
sinon.restore();
});

after(async () => {
sinon.restore();
console.error = originalConsoleErrorFunction;
// Clean up after each test by closing and clearing the event stream
await eventStream.close();
});

it('emits all messages to each subscriptions', async () => {
// Scenario: We create 2 separate subscriptions that listen to all messages
// and we emit 3 messages. We expect both subscriptions to receive all 3 messages.

const messageCids1: string[] = [];
const handler1 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
messageCids1.push(messageCid);
};

const messageCids2: string[] = [];
const handler2 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
messageCids2.push(messageCid);
};

const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1);
const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
eventStream.emit('did:alice', { message: message1.message }, {});
const message2 = await TestDataGenerator.generateRecordsWrite({});
const message2Cid = await Message.getCid(message2.message);
eventStream.emit('did:alice', { message: message2.message }, {});
const message3 = await TestDataGenerator.generateRecordsWrite({});
const message3Cid = await Message.getCid(message3.message);
eventStream.emit('did:alice', { message: message3.message }, {});

// Use the TimingUtils to poll until the expected results are met
await TestTimingUtils.pollUntilSuccessOrTimeout(async () => {
expect(messageCids1).to.have.members([ message1Cid, message2Cid, message3Cid ]);
expect(messageCids2).to.have.members([ message1Cid, message2Cid, message3Cid ]);
// do not print the console error statements from the emitter error
console.error = (_):void => { };
});

await subscription1.close();
await subscription2.close();
});

it('does not receive messages if subscription is closed', async () => {
// Scenario: We create two subscriptions that listen to all messages.
// The reason we create two is in order to allow for a negative test case.
// We send a message, validate that both handlers processed the message
// We then close one of the subscriptions, and send another message.
// Now we validate that only the handler of the subscription that is still open received the message.

const sub1MessageCids: string[] = [];
const handler1 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
sub1MessageCids.push(messageCid);
};

const sub2MessageCids: string[] = [];
const handler2 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
sub2MessageCids.push(messageCid);
};

const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1);
const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
eventStream.emit('did:alice', { message: message1.message }, {});

// Use the TimingUtils to poll until the expected results are met
await TestTimingUtils.pollUntilSuccessOrTimeout(async () => {
expect(sub1MessageCids).to.have.length(1);
expect(sub1MessageCids).to.have.members([ message1Cid ]);

expect(sub2MessageCids).to.have.length(1);
expect(sub2MessageCids).to.have.members([ message1Cid ]);
beforeEach(() => {
sinon.restore();
});

await subscription1.close(); // close subscription 1

const message2 = await TestDataGenerator.generateRecordsWrite({});
const message2Cid = await Message.getCid(message2.message);
eventStream.emit('did:alice', { message: message2.message }, {});

// Use the TimingUtils to poll until the expected results are met
await TestTimingUtils.pollUntilSuccessOrTimeout(async() => {
// subscription 2 should have received the message
expect(sub2MessageCids.length).to.equal(2);
expect(sub2MessageCids).to.have.members([ message1Cid, message2Cid]);
after(async () => {
sinon.restore();
console.error = originalConsoleErrorFunction;
// Clean up after each test by closing and clearing the event stream
await eventStream.close();
});

// subscription 1 should not have received the message
expect(sub1MessageCids).to.have.length(1);
expect(sub1MessageCids).to.have.members([ message1Cid ]);
it('emits all messages to each subscriptions', async () => {
// Scenario: We create 2 separate subscriptions that listen to all messages
// and we emit 3 messages. We expect both subscriptions to receive all 3 messages.

const messageCids1: string[] = [];
const handler1 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
messageCids1.push(messageCid);
};

const messageCids2: string[] = [];
const handler2 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
messageCids2.push(messageCid);
};

const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1);
const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
eventStream.emit('did:alice', { message: message1.message }, {});
const message2 = await TestDataGenerator.generateRecordsWrite({});
const message2Cid = await Message.getCid(message2.message);
eventStream.emit('did:alice', { message: message2.message }, {});
const message3 = await TestDataGenerator.generateRecordsWrite({});
const message3Cid = await Message.getCid(message3.message);
eventStream.emit('did:alice', { message: message3.message }, {});

// Use the TimingUtils to poll until the expected results are met
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
await Poller.pollUntilSuccessOrTimeout(async () => {
expect(messageCids1).to.have.members([ message1Cid, message2Cid, message3Cid ]);
expect(messageCids2).to.have.members([ message1Cid, message2Cid, message3Cid ]);
});

await subscription1.close();
await subscription2.close();
});

await subscription2.close();
it('does not receive messages if subscription is closed', async () => {
// Scenario: We create two subscriptions that listen to all messages.
// The reason we create two is in order to allow for a negative test case.
// We send a message, validate that both handlers processed the message
// We then close one of the subscriptions, and send another message.
// Now we validate that only the handler of the subscription that is still open received the message.

const sub1MessageCids: string[] = [];
const handler1 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
sub1MessageCids.push(messageCid);
};

const sub2MessageCids: string[] = [];
const handler2 = async (_tenant: string, event: MessageEvent, _indexes: KeyValues): Promise<void> => {
const { message } = event;
const messageCid = await Message.getCid(message);
sub2MessageCids.push(messageCid);
};

const subscription1 = await eventStream.subscribe('did:alice', 'sub-1', handler1);
const subscription2 = await eventStream.subscribe('did:alice', 'sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
eventStream.emit('did:alice', { message: message1.message }, {});

// Use the TimingUtils to poll until the expected results are met
await Poller.pollUntilSuccessOrTimeout(async () => {
expect(sub1MessageCids).to.have.length(1);
expect(sub1MessageCids).to.have.members([ message1Cid ]);

expect(sub2MessageCids).to.have.length(1);
expect(sub2MessageCids).to.have.members([ message1Cid ]);
});

await subscription1.close(); // close subscription 1

const message2 = await TestDataGenerator.generateRecordsWrite({});
const message2Cid = await Message.getCid(message2.message);
eventStream.emit('did:alice', { message: message2.message }, {});

// Use the TimingUtils to poll until the expected results are met
await Poller.pollUntilSuccessOrTimeout(async() => {
// subscription 2 should have received the message
expect(sub2MessageCids.length).to.equal(2);
expect(sub2MessageCids).to.have.members([ message1Cid, message2Cid]);

// subscription 1 should not have received the message
expect(sub1MessageCids).to.have.length(1);
expect(sub1MessageCids).to.have.members([ message1Cid ]);
});

await subscription2.close();
});
});

it('does not receive messages if event stream is closed', async () => {

const Handler = {
handle: async (_tenant: string, _event: MessageEvent, _indexes: KeyValues): Promise<void> => {}
};
const handlerSpy = sinon.spy(Handler, 'handle');
await eventStream.subscribe('did:alice', 'sub-1', Handler.handle);

// close eventStream
await eventStream.close();

const message1 = await TestDataGenerator.generateRecordsWrite({});
eventStream.emit('did:alice', { message: message1.message }, {});
const message2 = await TestDataGenerator.generateRecordsWrite({});
eventStream.emit('did:alice', { message: message2.message }, {});

// NOTE: This is a hack!!
// In systems where the EventStream is a coordinated pub/sub system, the messages are not processed immediately.
// Since we are doing a negative test to ensure messages were NOT handled, we need to to give the message ample time to be processed.
await Time.sleep(3000);

expect(handlerSpy.called).to.be.false;
}).timeout(5000);
});
}
22 changes: 2 additions & 20 deletions tests/features/author-delegated-grant.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import { Dwn } from '../../src/dwn.js';
import { DwnErrorCode } from '../../src/core/dwn-error.js';
import { Jws } from '../../src/utils/jws.js';
import { PermissionGrant } from '../../src/protocols/permission-grant.js';
import { Poller } from '../utils/poller.js';
import { RecordsWrite } from '../../src/interfaces/records-write.js';
import { TestDataGenerator } from '../utils/test-data-generator.js';
import { TestEventStream } from '../test-event-stream.js';
import { TestStores } from '../test-stores.js';
import { TestTimingUtils } from '../utils/test-timing-utils.js';
import { Time } from '../../src/utils/time.js';

import { DidKey, UniversalResolver } from '@web5/dids';
Expand Down Expand Up @@ -552,29 +552,11 @@ export function testAuthorDelegatedGrant(): void {
const chatRecord2Reply = await dwn.processMessage(bob.did, chatRecord2.message, { dataStream: chatRecord2.dataStream });
expect(chatRecord2Reply.status.code).to.equal(202);

await TestTimingUtils.pollUntilSuccessOrTimeout(async () => {
await Poller.pollUntilSuccessOrTimeout(async () => {
expect(subscriptionChatRecords.size).to.equal(2);
expect([ ...subscriptionChatRecords ]).to.have.members([ chatRecord1.message.recordId, chatRecord2.message.recordId ]);
});

//TODO: https://github.com/TBD54566975/dwn-sdk-js/issues/759
// When `RecordsSubscribeHandler` builds up the matchFilters there are no matching filters for a delete within a context
// so the delete event is not being captured by the subscription handler. When the issue is resolved, uncomment the code below

// Bob deletes one of the chat messages
// const deleteRecord = await TestDataGenerator.generateRecordsDelete({
// author : bob,
// recordId: chatRecord1.message.recordId
// });
// const deleteRecordReply = await dwn.processMessage(bob.did, deleteRecord.message);
// expect(deleteRecordReply.status.code).to.equal(202);

// // verify the subscription received the delete event
// await TestTimingUtils.pollUntilSuccessOrTimeout(async () => {
// expect(subscriptionChatRecords.size).to.equal(1);
// expect([ ...subscriptionChatRecords ]).to.have.members([ chatRecord2.message.recordId ]); // only chatRecord2 should be left
// });

await recordsSubscribeByDeviceXReply.subscription?.close();
});

Expand Down
Loading