From 59767bc296c1c2e30890c8d88cec9a005f2f24d9 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 2 Feb 2025 17:29:56 +0900 Subject: [PATCH] feat: connection pooling --- connection.ts | 6 +++ executor.ts | 3 ++ pool/default_pool.ts | 95 +++++++++++++++++++++++++++++++++++++++ pool/default_pool_test.ts | 81 +++++++++++++++++++++++++++++++++ pool/pool.ts | 5 +++ 5 files changed, 190 insertions(+) create mode 100644 pool/default_pool.ts create mode 100644 pool/default_pool_test.ts create mode 100644 pool/pool.ts diff --git a/connection.ts b/connection.ts index 68b6b08d..be667828 100644 --- a/connection.ts +++ b/connection.ts @@ -34,10 +34,12 @@ export interface SendCommandOptions { } export interface Connection extends TypedEventTarget { + /** @deprecated */ name: string | null; isClosed: boolean; isConnected: boolean; close(): void; + [Symbol.dispose](): void; connect(): Promise; reconnect(): Promise; sendCommand( @@ -309,6 +311,10 @@ export class RedisConnection this.#close(false); } + [Symbol.dispose](): void { + return this.close(); + } + #close(canReconnect = false) { const isClosedAlready = this._isClosed; diff --git a/executor.ts b/executor.ts index fc444725..3ab952fe 100644 --- a/executor.ts +++ b/executor.ts @@ -2,6 +2,9 @@ import type { Connection, SendCommandOptions } from "./connection.ts"; import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; export interface CommandExecutor { + /** + * @deprecated + */ readonly connection: Connection; /** * @deprecated diff --git a/pool/default_pool.ts b/pool/default_pool.ts new file mode 100644 index 00000000..3a97aeb6 --- /dev/null +++ b/pool/default_pool.ts @@ -0,0 +1,95 @@ +import type { Pool } from "./pool.ts"; + +const kDefaultTimeout = 5_000; +class DefaultPool implements Pool { + readonly #idle: Array = []; + readonly #connections: Array = []; + #connectionCount: number = 0; + readonly #deferredQueue: Array> = []; + readonly #options: Required>; + + constructor( + { + maxConnections = 8, + acquire, + }: PoolOptions, + ) { + this.#options = { + acquire, + maxConnections, + }; + } + + async acquire(signal?: AbortSignal): Promise { + signal ||= AbortSignal.timeout(kDefaultTimeout); + signal.throwIfAborted(); + if (this.#idle.length > 0) { + const conn = this.#idle.shift()!; + return Promise.resolve(conn); + } + + if (this.#connectionCount < this.#options.maxConnections) { + this.#connectionCount++; + try { + const connection = await this.#options.acquire(); + this.#connections.push(connection); + return connection; + } catch (error) { + this.#connectionCount--; + throw error; + } + } + + const deferred = Promise.withResolvers(); + this.#deferredQueue.push(deferred); + const { promise, reject } = deferred; + const onAbort = () => { + const i = this.#deferredQueue.indexOf(deferred); + if (i === -1) return; + this.#deferredQueue.splice(i, 1); + reject(signal.reason); + }; + signal.addEventListener("abort", onAbort, { once: true }); + return promise; + } + + release(conn: T): void { + if (!this.#connections.includes(conn)) { + throw new Error( + "This connection has already been removed from the pool.", + ); + } else if (this.#deferredQueue.length > 0) { + const i = this.#deferredQueue.shift()!; + i.resolve(conn); + } else { + this.#idle.push(conn); + } + } + + close() { + const errors: Array = []; + for (const x of this.#connections) { + try { + x[Symbol.dispose](); + } catch (error) { + errors.push(error); + } + } + this.#connections.length = 0; + this.#idle.length = 0; + if (errors.length > 0) { + throw new AggregateError(errors); + } + } +} + +export interface PoolOptions { + maxConnections?: number; + acquire(): Promise; +} + +export function createDefaultPool( + options: PoolOptions, +): Pool { + return new DefaultPool(options); +} diff --git a/pool/default_pool_test.ts b/pool/default_pool_test.ts new file mode 100644 index 00000000..5fe6895d --- /dev/null +++ b/pool/default_pool_test.ts @@ -0,0 +1,81 @@ +import { assert, assertEquals, assertRejects } from "../deps/std/assert.ts"; +import { createDefaultPool } from "./default_pool.ts"; + +class FakeConnection implements Disposable { + #isClosed = false; + isClosed() { + return this.#isClosed; + } + [Symbol.dispose]() { + if (this.#isClosed) { + throw new Error("Already closed"); + } + this.#isClosed = true; + } +} + +Deno.test("DefaultPool", async () => { + const openConnections: Array = []; + const pool = createDefaultPool({ + acquire: () => { + const connection = new FakeConnection(); + openConnections.push(connection); + return Promise.resolve(connection); + }, + maxConnections: 2, + }); + assertEquals(openConnections, []); + + const signal = AbortSignal.timeout(200); + + const conn1 = await pool.acquire(signal); + assertEquals(openConnections, [conn1]); + assert(openConnections.every((x) => !x.isClosed())); + assert(!signal.aborted); + + const conn2 = await pool.acquire(signal); + assertEquals(openConnections, [conn1, conn2]); + assert(!conn2.isClosed()); + assert(openConnections.every((x) => !x.isClosed())); + assert(!signal.aborted); + + { + // Tests timeout handling + await assertRejects( + () => pool.acquire(signal), + "Intentionally aborted", + ); + assert(signal.aborted); + assertEquals(openConnections, [conn1, conn2]); + assert(openConnections.every((x) => !x.isClosed())); + } + + { + // Tests `release()` + pool.release(conn2); + assertEquals(openConnections, [conn1, conn2]); + + const conn = await pool.acquire(new AbortController().signal); + assert(conn === conn2, "A new connection should not be created"); + assertEquals(openConnections, [conn1, conn2]); + } + + { + // `Pool#acquire` should wait for an active connection to be released. + const signal = AbortSignal.timeout(3_000); + const promise = pool.acquire(signal); + setTimeout(() => { + pool.release(conn1); + }, 50); + const conn = await promise; + assert(conn === conn1, "A new connection should not be created"); + assertEquals(openConnections, [conn1, conn2]); + assert(!signal.aborted); + } + + { + // `Pool#close` closes all connections + pool.close(); + assert(openConnections.every((x) => x.isClosed())); + } +}); diff --git a/pool/pool.ts b/pool/pool.ts new file mode 100644 index 00000000..228dc894 --- /dev/null +++ b/pool/pool.ts @@ -0,0 +1,5 @@ +export interface Pool { + acquire(signal?: AbortSignal): Promise; + release(conn: T): void; + close(): void; +}