diff --git a/src/iterable-request.ts b/src/iterable-request.ts new file mode 100644 index 000000000..c9f4d7e10 --- /dev/null +++ b/src/iterable-request.ts @@ -0,0 +1,266 @@ +// This module implements an iterable `Request` class. + +import Request, { type RequestOptions } from './request'; +import { type ColumnMetadata } from './token/colmetadata-token-parser'; + +export interface ColumnValue { + metadata: ColumnMetadata; + value: any; +} + +type RowData = ColumnValue[] | Record; // type variant depending on config.options.useColumnNames +type ColumnMetadataDef = ColumnMetadata[] | Record; // type variant depending on config.options.useColumnNames + +export interface IterableRequestOptions extends RequestOptions { + iteratorFifoSize: number; +} + +/** +* The item object of the request iterator. +*/ +export interface IterableRequestItem { + + /** + * Row data. + */ + row: RowData; + + /** + * Result set number, 1..n. + */ + resultSetNo: number; + + /** + * Metadata of all columns. + */ + columnMetadata: ColumnMetadataDef; + +} + +type iteratorPromiseResolveFunction = (value: IteratorResult) => void; +type iteratorPromiseRejectFunction = (error: Error) => void; +interface IteratorPromiseFunctions {resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction} + +// Internal class for the state controller logic of the iterator. +class IterableRequestController { + + private request: Request; + private requestCompleted: boolean; + private requestPaused: boolean; + private error: Error | undefined; + private terminating: boolean; + + private resultSetNo: number; + private columnMetadata: ColumnMetadataDef | undefined; + private fifo: IterableRequestItem[]; + private fifoPauseLevel: number; + private fifoResumeLevel: number; + + private promises: IteratorPromiseFunctions[]; // FIFO of resolve/reject function pairs of pending promises + private terminatorResolve: (() => void) | undefined; + private terminatorPromise: Promise | undefined; + + // --- Constructor / Terminator ---------------------------------------------- + + constructor(request: Request, options?: IterableRequestOptions) { + this.request = request; + this.requestCompleted = false; + this.requestPaused = false; + this.terminating = false; + + this.resultSetNo = 0; + this.fifo = []; + const fifoSize = options?.iteratorFifoSize ?? 1024; + this.fifoPauseLevel = fifoSize; + this.fifoResumeLevel = Math.floor(fifoSize / 2); + + this.promises = []; + + request.addListener('row', this.rowEventHandler); + request.addListener('columnMetadata', this.columnMetadataEventHandler); + } + + public terminate(): Promise { + this.terminating = true; + this.request.resume(); // (just to be sure) + if (this.requestCompleted || !this.request.connection) { + return Promise.resolve(); + } + this.request.connection.cancel(); + if (!this.terminatorPromise) { + this.terminatorPromise = new Promise((resolve: () => void) => { + this.terminatorResolve = resolve; + }); + } + return this.terminatorPromise; + } + + // --- Promise logic --------------------------------------------------------- + + private serveError(): boolean { + if (!this.error || !this.promises.length) { + return false; + } + const promise = this.promises.shift()!; + promise.reject(this.error); + return true; + } + + private serveRowItem(): boolean { + if (!this.fifo.length || !this.promises.length) { + return false; + } + const item = this.fifo.shift()!; + const promise = this.promises.shift()!; + promise.resolve({ value: item }); + if (this.fifo.length <= this.fifoResumeLevel && this.requestPaused) { + this.request.resume(); + this.requestPaused = false; + } + return true; + } + + private serveRequestCompletion(): boolean { + if (!this.requestCompleted || !this.promises.length) { + return false; + } + const promise = this.promises.shift()!; + promise.resolve({ done: true, value: undefined }); + return true; + } + + private serveNextPromise(): boolean { + if (this.serveRowItem()) { + return true; + } + if (this.serveError()) { + return true; + } + if (this.serveRequestCompletion()) { + return true; + } + return false; + } + + private servePromises() { + while (true) { + if (!this.serveNextPromise()) { + break; + } + } + } + + // This promise executor is called synchronously from within Iterator.next(). + public promiseExecutor = (resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction) => { + this.promises.push({ resolve, reject }); + this.servePromises(); + }; + + // --- Event handlers -------------------------------------------------------- + + public completionCallback(error: Error | null | undefined) { + this.requestCompleted = true; + if (this.terminating) { + if (this.terminatorResolve) { + this.terminatorResolve(); + } + return; + } + if (error && !this.error) { + this.error = error; + } + this.servePromises(); + } + + private columnMetadataEventHandler = (columnMetadata: ColumnMetadata[] | Record) => { + this.resultSetNo++; + this.columnMetadata = columnMetadata; + }; + + private rowEventHandler = (row: RowData) => { + if (this.requestCompleted || this.error || this.terminating) { + return; + } + if (this.resultSetNo === 0 || !this.columnMetadata) { + this.error = new Error('No columnMetadata event received before row event.'); + this.servePromises(); + return; + } + const item: IterableRequestItem = { row, resultSetNo: this.resultSetNo, columnMetadata: this.columnMetadata }; + this.fifo.push(item); + if (this.fifo.length >= this.fifoPauseLevel && !this.requestPaused) { + this.request.pause(); + this.requestPaused = true; + } + this.servePromises(); + }; + +} + +// Internal class for the iterator object which is passed to the API client. +class IterableRequestIterator implements AsyncIterator { + + private controller: IterableRequestController; + + constructor(controller: IterableRequestController) { + this.controller = controller; + } + + public next(): Promise> { + return new Promise>(this.controller.promiseExecutor); + } + + public async return(value?: any): Promise { + await this.controller.terminate(); + return { value, done: true }; + } + + public async throw(exception?: any): Promise { + await this.controller.terminate(); + if (exception) { + throw exception; + } else { + return { done: true }; + } + } + +} + +/** +* An iterable `Request` class. +* +* This iterable version is a super class of the normal `Request` class. +* +* Usage: +* ```js +* const request = new IterableRequest("select 42, 'hello world'"); +* connection.execSql(request); +* for await (const item of request) { +* console.log(item.row); +* } +* ``` +*/ +class IterableRequest extends Request implements AsyncIterable { + + private iterator: IterableRequestIterator; + + constructor(sqlTextOrProcedure: string | undefined, options?: IterableRequestOptions) { + super(sqlTextOrProcedure, completionCallback, options); + const controller = new IterableRequestController(this, options); + this.iterator = new IterableRequestIterator(controller); + + function completionCallback(error: Error | null | undefined) { + if (controller) { + controller.completionCallback(error); + } + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return this.iterator; + } + +} + +export default IterableRequest; +module.exports = IterableRequest; diff --git a/src/request.ts b/src/request.ts index 6b6b4f30c..66f29c22e 100644 --- a/src/request.ts +++ b/src/request.ts @@ -39,7 +39,7 @@ export interface ParameterOptions { scale?: number; } -interface RequestOptions { +export interface RequestOptions { statementColumnEncryptionSetting?: SQLServerStatementColumnEncryptionSetting; } diff --git a/src/tedious.ts b/src/tedious.ts index 723eeefaa..deb536990 100644 --- a/src/tedious.ts +++ b/src/tedious.ts @@ -1,6 +1,7 @@ import BulkLoad from './bulk-load'; import Connection, { type ConnectionAuthentication, type ConnectionConfiguration, type ConnectionOptions } from './connection'; import Request from './request'; +import IterableRequest from './iterable-request'; import { name } from './library'; import { ConnectionError, RequestError } from './errors'; @@ -21,6 +22,7 @@ export { BulkLoad, Connection, Request, + IterableRequest, library, ConnectionError, RequestError, diff --git a/test/integration/iterable-request-test.ts b/test/integration/iterable-request-test.ts new file mode 100644 index 000000000..c9c96d481 --- /dev/null +++ b/test/integration/iterable-request-test.ts @@ -0,0 +1,161 @@ +import { assert } from 'chai'; + +import Connection from '../../src/connection'; +import { RequestError } from '../../src/errors'; +import IterableRequest, { type ColumnValue } from '../../src/iterable-request'; +import { debugOptionsFromEnv } from '../helpers/debug-options-from-env'; + +import defaultConfig from '../config'; + +function getConfig() { + const config = { + ...defaultConfig, + options: { + ...defaultConfig.options, + debug: debugOptionsFromEnv(), + tdsVersion: process.env.TEDIOUS_TDS_VERSION, + } + }; + + return config; +} + +function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +describe('Iterable Request Test', function() { + this.timeout(10000); + let connection: Connection; + + beforeEach(function(done) { + connection = new Connection(getConfig()); + if (process.env.TEDIOUS_DEBUG) { + connection.on('debug', console.log); + } + connection.connect(done); + }); + + afterEach(function(done) { + if (connection.closed) { + done(); + } else { + connection.on('end', done); + connection.close(); + } + }); + + it('tests basic functionality of the iterable request with use of internal FIFO', async function() { + const n = 20000; + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < ${n}) + select i from cte1 option (maxrecursion 0) + `; + + const request = new IterableRequest(sql); + connection.execSql(request); + + let ctr = 0; + for await (const item of request) { + assert(item.resultSetNo === 1); + const row = item.row as ColumnValue[]; + const i = row[0].value; + assert(i === ctr + 1); + if (ctr === Math.floor(n / 2)) { + await sleep(250); + } + ctr++; + } + assert(ctr === n); + }); + + it('tests an iterable request with multiple result sets', async function() { + const sql = ` + select 1, 'abc' + select 2 + select 3, 555 + `; + + const request = new IterableRequest(sql); + connection.execSql(request); + + let ctr = 0; + for await (const item of request) { + assert(item.resultSetNo === ctr + 1); + const row = item.row as ColumnValue[]; + const i = row[0].value; + assert(i === ctr + 1); + ctr++; + } + assert(ctr === 3); + }); + + it('checks that a for loop with an iterable request can be aborted before the end of the result set', async function() { + + await testForLoop(10000, 500); + await testForLoop(10000, 3); + await testForLoop(10000, 250, 100); + await testForLoop(100, 100); + + async function testForLoop(n: number, abortCount: number, sleepPos = -1) { + const sql = ` + with cte1 as + (select 1 as i union all select i + 1 from cte1 where i < ${n}) + select i from cte1 option (maxrecursion 0) + `; + + const request = new IterableRequest(sql); + connection.execSql(request); + + let ctr = 0; + for await (const item of request) { + const row = item.row as ColumnValue[]; + const i = row[0].value; + assert(i === ctr + 1); + if (ctr === sleepPos) { + await sleep(250); + } + ctr++; + if (ctr === abortCount) { + break; + } + } + assert(ctr === abortCount); + } + + }); + + it('tests the error handling logic of the iterable request module', async function() { + const sql = ` + select 1 + select 2 + select 3 / 0 + `; + + const request = new IterableRequest(sql); + connection.execSql(request); + + let ctr = 0; + let errCtr = 0; + try { + for await (const item of request) { + assert(item.resultSetNo === ctr + 1); + const row = item.row as ColumnValue[]; + const i = row[0].value; + assert(i === ctr + 1); + ctr++; + } + } catch (err) { + assert.instanceOf(err, RequestError); + assert((err as RequestError).message.toLowerCase().includes('divide by zero')); + errCtr++; + } + assert(ctr === 2); + assert(errCtr === 1); + }); + + +});