Skip to content

Commit

Permalink
feat: send the queueUrl alongside emitted events (#560)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn authored Jan 28, 2025
1 parent 7bc55fc commit 3ca59bf
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { logger } from "./logger.js";
export class Consumer extends TypedEventEmitter {
private pollingTimeoutId: NodeJS.Timeout | undefined = undefined;
private stopped = true;
private queueUrl: string;
protected queueUrl: string;
private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private preReceiveMessageCallback?: () => Promise<void>;
Expand Down Expand Up @@ -70,7 +70,7 @@ export class Consumer extends TypedEventEmitter {
private extendedAWSErrors: boolean;

constructor(options: ConsumerOptions) {
super();
super(options.queueUrl);
assertOptions(options);
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
Expand Down
30 changes: 24 additions & 6 deletions src/emitter.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,55 @@
import { EventEmitter } from "node:events";

import { logger } from "./logger.js";
import { Events } from "./types.js";
import { Events, QueueMetadata } from "./types.js";

export class TypedEventEmitter extends EventEmitter {
protected queueUrl?: string;

/**
* @param queueUrl - The URL of the SQS queue this emitter is associated with
*/
constructor(queueUrl?: string) {
super();
this.queueUrl = queueUrl;
}

/**
* Trigger a listener on all emitted events
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
on<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void,
listener: (...args: [...Events[E], QueueMetadata]) => void,
): this {
return super.on(event, listener);
}

/**
* Trigger a listener only once for an emitted event
* @param event The name of the event to listen to
* @param listener A function to trigger when the event is emitted
*/
once<E extends keyof Events>(
event: E,
listener: (...args: Events[E]) => void,
listener: (...args: [...Events[E], QueueMetadata]) => void,
): this {
return super.once(event, listener);
}

/**
* Emits an event with the provided arguments
* Emits an event with the provided arguments and adds queue metadata
* @param event The name of the event to emit
* @param args The arguments to pass to the event listeners
* @returns {boolean} Returns true if the event had listeners, false otherwise
* @example
* // Inside a method:
* this.emit('message_received', message);
*/
emit<E extends keyof Events>(event: E, ...args: Events[E]): boolean {
logger.debug(event, ...args);
return super.emit(event, ...args);
const metadata: QueueMetadata = { queueUrl: this.queueUrl };
logger.debug(event, ...args, metadata);
return super.emit(event, ...args, metadata);
}
}
13 changes: 13 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,21 @@ export interface StopOptions {
abort?: boolean;
}

/**
* Metadata about the queue that is added to every event
*/
export interface QueueMetadata {
queueUrl?: string;
}

/**
* These are the events that the consumer emits.
* Each event will receive QueueMetadata as the last argument.
* @example
* consumer.on('message_received', (message, metadata) => {
* console.log(`Received message from queue: ${metadata.queueUrl}`);
* console.log(message);
* });
*/
export interface Events {
/**
Expand Down
59 changes: 56 additions & 3 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2278,19 +2278,72 @@ describe("Consumer", () => {
});
});

describe("logger", () => {
describe("events", () => {
it("logs a debug event when an event is emitted", async () => {
const loggerDebug = sandbox.stub(logger, "debug");

consumer.start();
consumer.stop();

sandbox.assert.callCount(loggerDebug, 5);
// Logged directly
sandbox.assert.calledWithMatch(loggerDebug, "starting");
sandbox.assert.calledWithMatch(loggerDebug, "started");
// Sent from the emitter
sandbox.assert.calledWithMatch(loggerDebug, "started", {
queueUrl: QUEUE_URL,
});
// Logged directly
sandbox.assert.calledWithMatch(loggerDebug, "polling");
// Logged directly
sandbox.assert.calledWithMatch(loggerDebug, "stopping");
sandbox.assert.calledWithMatch(loggerDebug, "stopped");
// Sent from the emitter
sandbox.assert.calledWithMatch(loggerDebug, "stopped", {
queueUrl: QUEUE_URL,
});
});

it("includes queueUrl in emitted events", async () => {
const startedListener = sandbox.stub();
const messageReceivedListener = sandbox.stub();
const messageProcessedListener = sandbox.stub();
const emptyListener = sandbox.stub();
const stoppedListener = sandbox.stub();
const errorListener = sandbox.stub();
const processingErrorListener = sandbox.stub();

consumer.on("started", startedListener);
consumer.on("message_received", messageReceivedListener);
consumer.on("message_processed", messageProcessedListener);
consumer.on("empty", emptyListener);
consumer.on("stopped", stoppedListener);
consumer.on("error", errorListener);
consumer.on("processing_error", processingErrorListener);

consumer.start();
await pEvent(consumer, "message_processed");
consumer.stop();

handleMessage.rejects(new Error("Processing error"));
consumer.start();
await pEvent(consumer, "processing_error");
consumer.stop();

sandbox.assert.calledWith(startedListener, { queueUrl: QUEUE_URL });
sandbox.assert.calledWith(messageReceivedListener, response.Messages[0], {
queueUrl: QUEUE_URL,
});
sandbox.assert.calledWith(
messageProcessedListener,
response.Messages[0],
{ queueUrl: QUEUE_URL },
);
sandbox.assert.calledWith(stoppedListener, { queueUrl: QUEUE_URL });
sandbox.assert.calledWith(
processingErrorListener,
sinon.match.instanceOf(Error),
response.Messages[0],
{ queueUrl: QUEUE_URL },
);
});
});
});

0 comments on commit 3ca59bf

Please sign in to comment.