Skip to content

Commit

Permalink
chore: add message queue interface
Browse files Browse the repository at this point in the history
  • Loading branch information
rotorsoft committed Sep 28, 2024
1 parent 1b7f66a commit be12e85
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
63 changes: 63 additions & 0 deletions libs/eventually-pg/src/PostgresMessageQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {
dispose,
log,
logAdapterCreated,
logAdapterDisposed,
MessageQueue,
Messages
} from "@rotorsoft/eventually";
import { Pool } from "pg";
import { config } from "./config";

export const PostgresMessageQueue = <M extends Messages>(
table: string
): MessageQueue<M> => {
const pool = new Pool(config.pg);
const queue: MessageQueue<M> = {
name: `PostgresMessageQueue:${table}`,
dispose: async () => {
await pool.end();
},

// TODO: implement
enqueue: async (messages) => {
const sql = `TODO INSERT`.concat(
messages.map((message) => message.name).join(", ")
);
log().green().data("sql:", sql);
await pool.query(sql);
},

// TODO: implement
dequeue: async (callback, stream) => {
const sql = `SELECT * FROM "${table}" WHERE stream = '${stream}' ORDER BY created ASC LIMIT 1`;
log().green().data("sql:", sql);
// const result = await pool.query(sql);
// TODO: handle results with callback
return Promise.resolve();
},

// TODO: implement
seed: async () => {
const seed = "TODO SEED";
log().green().info(`>>> Seeding message queue table: ${table}`);
log().gray().info(seed);
await pool.query(seed);
},

drop: async (): Promise<void> => {
await pool.query(`DROP TABLE IF EXISTS "${table}"`);
}
};

logAdapterCreated(queue.name);
dispose(() => {
if (queue.dispose) {
logAdapterDisposed(queue.name);
return queue.dispose();
}
return Promise.resolve();
});

return queue;
};
1 change: 1 addition & 0 deletions libs/eventually-pg/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/** @module eventually-pg */
export * from "./config";
export * from "./PostgresMessageQueue";
export * from "./PostgresProjectorStore";
export * from "./PostgresStore";
export * from "./PostgresSubscriptionStore";
34 changes: 34 additions & 0 deletions libs/eventually/src/interfaces/stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,37 @@ export interface SubscriptionStore extends Disposable {
*/
drop: () => Promise<void>;
}

/**
* High throughput FIFO message queue supporting multiple concurrent producers and
* a controlled dequeuing process guaranteeing at-least-once ordered delivery to consumers of the same stream.
*
* Adapters should implement mechanisms to ensure message ordering and delivery guarantees.
*/
export interface MessageQueue<M extends Messages> extends Disposable {
/**
* Enqueues messages
* @param messages the messages
*/
enqueue: (messages: Message<M>[]) => Promise<void>;

/**
* Dequeues message on top of the queue after being processed by consumer callback
* @param callback consumer callback
* @param stream optional stream name to support independent concurrent consumers
*/
dequeue: (
callback: (message: Message<M> & { created: Date }) => Promise<void>,
stream?: string
) => Promise<void>;

/**
* Seeds the store
*/
seed: () => Promise<void>;

/**
* Drops the store
*/
drop: () => Promise<void>;
}

0 comments on commit be12e85

Please sign in to comment.