Skip to content

Commit

Permalink
feat: connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
uki00a committed Feb 2, 2025
1 parent b8c4561 commit 59767bc
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 0 deletions.
6 changes: 6 additions & 0 deletions connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ export interface SendCommandOptions {
}

export interface Connection extends TypedEventTarget<ConnectionEventMap> {
/** @deprecated */
name: string | null;
isClosed: boolean;
isConnected: boolean;
close(): void;
[Symbol.dispose](): void;
connect(): Promise<void>;
reconnect(): Promise<void>;
sendCommand(
Expand Down Expand Up @@ -309,6 +311,10 @@ export class RedisConnection
this.#close(false);
}

[Symbol.dispose](): void {
return this.close();
}

#close(canReconnect = false) {
const isClosedAlready = this._isClosed;

Expand Down
3 changes: 3 additions & 0 deletions executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import type { Connection, SendCommandOptions } from "./connection.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";

export interface CommandExecutor {
/**
* @deprecated
*/
readonly connection: Connection;
/**
* @deprecated
Expand Down
95 changes: 95 additions & 0 deletions pool/default_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import type { Pool } from "./pool.ts";

const kDefaultTimeout = 5_000;
class DefaultPool<T extends Disposable> implements Pool<T> {
readonly #idle: Array<T> = [];
readonly #connections: Array<T> = [];
#connectionCount: number = 0;
readonly #deferredQueue: Array<PromiseWithResolvers<T>> = [];
readonly #options: Required<PoolOptions<T>>;

constructor(
{
maxConnections = 8,
acquire,
}: PoolOptions<T>,
) {
this.#options = {
acquire,
maxConnections,
};
}

async acquire(signal?: AbortSignal): Promise<T> {
signal ||= AbortSignal.timeout(kDefaultTimeout);
signal.throwIfAborted();
if (this.#idle.length > 0) {
const conn = this.#idle.shift()!;
return Promise.resolve(conn);
}

if (this.#connectionCount < this.#options.maxConnections) {
this.#connectionCount++;
try {
const connection = await this.#options.acquire();
this.#connections.push(connection);
return connection;
} catch (error) {
this.#connectionCount--;
throw error;
}
}

const deferred = Promise.withResolvers<T>();
this.#deferredQueue.push(deferred);
const { promise, reject } = deferred;
const onAbort = () => {
const i = this.#deferredQueue.indexOf(deferred);
if (i === -1) return;
this.#deferredQueue.splice(i, 1);
reject(signal.reason);
};
signal.addEventListener("abort", onAbort, { once: true });
return promise;
}

release(conn: T): void {
if (!this.#connections.includes(conn)) {
throw new Error(
"This connection has already been removed from the pool.",
);
} else if (this.#deferredQueue.length > 0) {
const i = this.#deferredQueue.shift()!;
i.resolve(conn);
} else {
this.#idle.push(conn);
}
}

close() {
const errors: Array<unknown> = [];
for (const x of this.#connections) {
try {
x[Symbol.dispose]();
} catch (error) {
errors.push(error);
}
}
this.#connections.length = 0;
this.#idle.length = 0;
if (errors.length > 0) {
throw new AggregateError(errors);
}
}
}

export interface PoolOptions<T extends Disposable> {
maxConnections?: number;
acquire(): Promise<T>;
}

export function createDefaultPool<T extends Disposable>(
options: PoolOptions<T>,
): Pool<T> {
return new DefaultPool<T>(options);
}
81 changes: 81 additions & 0 deletions pool/default_pool_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { assert, assertEquals, assertRejects } from "../deps/std/assert.ts";
import { createDefaultPool } from "./default_pool.ts";

class FakeConnection implements Disposable {
#isClosed = false;
isClosed() {
return this.#isClosed;
}
[Symbol.dispose]() {
if (this.#isClosed) {
throw new Error("Already closed");
}
this.#isClosed = true;
}
}

Deno.test("DefaultPool", async () => {
const openConnections: Array<FakeConnection> = [];
const pool = createDefaultPool({
acquire: () => {
const connection = new FakeConnection();
openConnections.push(connection);
return Promise.resolve(connection);
},
maxConnections: 2,
});
assertEquals(openConnections, []);

const signal = AbortSignal.timeout(200);

const conn1 = await pool.acquire(signal);
assertEquals(openConnections, [conn1]);
assert(openConnections.every((x) => !x.isClosed()));
assert(!signal.aborted);

const conn2 = await pool.acquire(signal);
assertEquals(openConnections, [conn1, conn2]);
assert(!conn2.isClosed());
assert(openConnections.every((x) => !x.isClosed()));
assert(!signal.aborted);

{
// Tests timeout handling
await assertRejects(
() => pool.acquire(signal),
"Intentionally aborted",
);
assert(signal.aborted);
assertEquals(openConnections, [conn1, conn2]);
assert(openConnections.every((x) => !x.isClosed()));
}

{
// Tests `release()`
pool.release(conn2);
assertEquals(openConnections, [conn1, conn2]);

const conn = await pool.acquire(new AbortController().signal);
assert(conn === conn2, "A new connection should not be created");
assertEquals(openConnections, [conn1, conn2]);
}

{
// `Pool#acquire` should wait for an active connection to be released.
const signal = AbortSignal.timeout(3_000);
const promise = pool.acquire(signal);
setTimeout(() => {
pool.release(conn1);
}, 50);
const conn = await promise;
assert(conn === conn1, "A new connection should not be created");
assertEquals(openConnections, [conn1, conn2]);
assert(!signal.aborted);
}

{
// `Pool#close` closes all connections
pool.close();
assert(openConnections.every((x) => x.isClosed()));
}
});
5 changes: 5 additions & 0 deletions pool/pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface Pool<T extends Disposable> {
acquire(signal?: AbortSignal): Promise<T>;
release(conn: T): void;
close(): void;
}

0 comments on commit 59767bc

Please sign in to comment.