Skip to content

Commit

Permalink
feat: refactor to use stream iterator functions like map
Browse files Browse the repository at this point in the history
CommonDao.stream* functions are now iteration-friendly,
can be used with `.toArray()` and such
  • Loading branch information
kirillgroshkov committed Apr 6, 2024
1 parent be4dbff commit 0910865
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 53 deletions.
10 changes: 9 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Returns `ReadableTyped` (typed wrapper of Node.js
Streams in Node.js support back-pressure by default (if piped properly by the consumer).
```typescript
```ts
const q = DBQuery.create('table1') // "return all items" query

await _pipeline([
Expand All @@ -165,6 +165,14 @@ await _pipeline([
// ...
```
Alternative:
```ts
await db.streamQuery(q).forEach(item => {
console.log(item)
})
```
###### saveBatch
`saveBatch<DBM>(table: string, dbms: DBM[]): Promise<void>`
Expand Down
15 changes: 9 additions & 6 deletions src/commondao/common.dao.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import {
import {
AjvSchema,
AjvValidationError,
writableForEach,
_pipeline,
deflateString,
inflateToString,
} from '@naturalcycles/nodejs-lib'
Expand All @@ -29,7 +27,12 @@ import {
createTestItemBM,
} from '../testing'
import { CommonDao } from './common.dao'
import { CommonDaoCfg, CommonDaoLogLevel, CommonDaoSaveBatchOptions } from './common.dao.model'
import {
CommonDaoCfg,
CommonDaoLogLevel,
CommonDaoOptions,
CommonDaoSaveBatchOptions,
} from './common.dao.model'

let throwError = false

Expand Down Expand Up @@ -102,7 +105,7 @@ test('should propagate pipe errors', async () => {

throwError = true

const opt = {
const opt: CommonDaoOptions = {
// logEvery: 1,
}

Expand Down Expand Up @@ -145,8 +148,8 @@ test('should propagate pipe errors', async () => {
expect(results).toEqual(items.filter(i => i.id !== 'id3'))

// .stream should suppress by default
results = []
await _pipeline([dao.query().streamQuery(opt), writableForEach(r => void results.push(r))])
results = await dao.query().streamQuery(opt).toArray()

expect(results).toEqual(items.filter(i => i.id !== 'id3'))
})

Expand Down
37 changes: 29 additions & 8 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
transformLogProgress,
transformMap,
transformMapSimple,
transformNoOp,
writableVoid,
} from '@naturalcycles/nodejs-lib'
import { DBLibError } from '../cnst'
Expand Down Expand Up @@ -496,6 +497,18 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
const partialQuery = !!q._selectedFieldNames
if (partialQuery) return stream

// This almost works, but hard to implement `errorMode: THROW_AGGREGATED` in this case
// return stream.flatMap(async (dbm: DBM) => {
// if (this.cfg.hooks!.afterLoad) {
// dbm = (await this.cfg.hooks!.afterLoad(dbm))!
// if (dbm === null) return [] // SKIP
// }
//
// return [await this.dbmToBM(dbm, opt)] satisfies BM[]
// }, {
// concurrency: 16,
// })

return (
stream
// optimization: 1 validation is enough
Expand All @@ -517,9 +530,11 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
},
),
)
// this can make the stream async-iteration-friendly
// but not applying it now for perf reasons
// .pipe(transformNoOp())
// this can make the stream async-iteration-friendly
// but not applying it now for perf reasons
// UPD: applying, to be compliant with `.toArray()`, etc.
.on('error', err => stream.emit('error', err))
.pipe(transformNoOp())
)
}

Expand All @@ -533,14 +548,20 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM> {
q.table = opt.table || q.table
opt.errorMode ||= ErrorMode.SUPPRESS

// Experimental: using `.map()`
const stream: ReadableTyped<string> = this.cfg.db
.streamQuery<DBM>(q.select(['id']), opt)
.on('error', err => stream.emit('error', err))
.pipe(
transformMapSimple<DBM, string>(r => r.id, {
errorMode: ErrorMode.SUPPRESS, // cause .pipe() cannot propagate errors
}),
)
.map((r: ObjectWithId) => r.id)

// const stream: ReadableTyped<string> = this.cfg.db
// .streamQuery<DBM>(q.select(['id']), opt)
// .on('error', err => stream.emit('error', err))
// .pipe(
// transformMapSimple<DBM, string>(r => r.id, {
// errorMode: ErrorMode.SUPPRESS, // cause .pipe() cannot propagate errors
// }),
// )

return stream
}
Expand Down
77 changes: 48 additions & 29 deletions src/kv/commonKeyValueDao.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import { AppError, ErrorMode, KeyValueTuple, pMap } from '@naturalcycles/js-lib'
import {
deflateString,
inflateToString,
ReadableTyped,
transformMap,
} from '@naturalcycles/nodejs-lib'
import { AppError, CommonLogger, KeyValueTuple, pMap } from '@naturalcycles/js-lib'
import { deflateString, inflateToString, ReadableTyped } from '@naturalcycles/nodejs-lib'
import { CommonDaoLogLevel } from '../commondao/common.dao.model'
import { CommonDBCreateOptions } from '../db.model'
import { CommonKeyValueDB, KeyValueDBTuple } from './commonKeyValueDB'
Expand All @@ -20,6 +15,11 @@ export interface CommonKeyValueDaoCfg<T> {
*/
readOnly?: boolean

/**
* Default to console
*/
logger?: CommonLogger

/**
* @default OPERATIONS
*/
Expand Down Expand Up @@ -48,16 +48,27 @@ export interface CommonKeyValueDaoCfg<T> {
// todo: readonly

export class CommonKeyValueDao<T> {
constructor(public cfg: CommonKeyValueDaoCfg<T>) {
constructor(cfg: CommonKeyValueDaoCfg<T>) {
this.cfg = {
hooks: {},
logger: console,
...cfg,
}

if (cfg.deflatedJsonValue) {
cfg.hooks = {
this.cfg.hooks = {
mapValueToBuffer: async v => await deflateString(JSON.stringify(v)),
mapBufferToValue: async buf => JSON.parse(await inflateToString(buf)),
...cfg.hooks,
}
}
}

cfg: CommonKeyValueDaoCfg<T> & {
hooks: NonNullable<CommonKeyValueDaoCfg<T>['hooks']>
logger: CommonLogger
}

async ping(): Promise<void> {
await this.cfg.db.ping()
}
Expand All @@ -68,7 +79,7 @@ export class CommonKeyValueDao<T> {

create(input: Partial<T> = {}): T {
return {
...this.cfg.hooks?.beforeCreate?.(input),
...this.cfg.hooks.beforeCreate?.(input),
} as T
}

Expand Down Expand Up @@ -117,7 +128,7 @@ export class CommonKeyValueDao<T> {
if (r) return r[1]

return {
...this.cfg.hooks?.beforeCreate?.({}),
...this.cfg.hooks.beforeCreate?.({}),
...part,
} as T
}
Expand All @@ -135,11 +146,11 @@ export class CommonKeyValueDao<T> {

async getByIds(ids: string[]): Promise<KeyValueTuple<string, T>[]> {
const entries = await this.cfg.db.getByIds(this.cfg.table, ids)
if (!this.cfg.hooks?.mapBufferToValue) return entries as any
if (!this.cfg.hooks.mapBufferToValue) return entries as any

return await pMap(entries, async ([id, buf]) => [
id,
await this.cfg.hooks!.mapBufferToValue!(buf),
await this.cfg.hooks.mapBufferToValue!(buf),
])
}

Expand All @@ -158,12 +169,12 @@ export class CommonKeyValueDao<T> {
async saveBatch(entries: KeyValueTuple<string, T>[]): Promise<void> {
let bufferEntries: KeyValueDBTuple[]

if (!this.cfg.hooks?.mapValueToBuffer) {
if (!this.cfg.hooks.mapValueToBuffer) {
bufferEntries = entries as any
} else {
bufferEntries = await pMap(entries, async ([id, v]) => [
id,
await this.cfg.hooks!.mapValueToBuffer!(v),
await this.cfg.hooks.mapValueToBuffer!(v),
])
}

Expand All @@ -187,37 +198,45 @@ export class CommonKeyValueDao<T> {
}

streamValues(limit?: number): ReadableTyped<T> {
if (!this.cfg.hooks?.mapBufferToValue) {
const { mapBufferToValue } = this.cfg.hooks

if (!mapBufferToValue) {
return this.cfg.db.streamValues(this.cfg.table, limit)
}

// todo: consider it when readableMap supports `errorMode: SUPPRESS`
// readableMap(this.cfg.db.streamValues(this.cfg.table, limit), async buf => await this.cfg.hooks!.mapBufferToValue(buf))
const stream: ReadableTyped<T> = this.cfg.db
.streamValues(this.cfg.table, limit)
.on('error', err => stream.emit('error', err))
.pipe(
transformMap(async buf => await this.cfg.hooks!.mapBufferToValue!(buf), {
errorMode: ErrorMode.SUPPRESS, // cause .pipe cannot propagate errors
}),
)
.flatMap(async (buf: Buffer) => {
try {
return [await mapBufferToValue(buf)] satisfies T[]
} catch (err) {
this.cfg.logger.error(err)
return [] // SKIP
}
})

return stream
}

streamEntries(limit?: number): ReadableTyped<KeyValueTuple<string, T>> {
if (!this.cfg.hooks?.mapBufferToValue) {
const { mapBufferToValue } = this.cfg.hooks

if (!mapBufferToValue) {
return this.cfg.db.streamEntries(this.cfg.table, limit)
}

const stream: ReadableTyped<KeyValueTuple<string, T>> = this.cfg.db
.streamEntries(this.cfg.table, limit)
.on('error', err => stream.emit('error', err))
.pipe(
transformMap(async ([id, buf]) => [id, await this.cfg.hooks!.mapBufferToValue!(buf)], {
errorMode: ErrorMode.SUPPRESS, // cause .pipe cannot propagate errors
}),
)
.flatMap(async ([id, buf]: KeyValueTuple<string, Buffer>) => {
try {
return [[id, await mapBufferToValue(buf)]] satisfies KeyValueTuple<string, T>[]
} catch (err) {
this.cfg.logger.error(err)
return [] // SKIP
}
})

return stream
}
Expand Down
8 changes: 2 additions & 6 deletions src/testing/daoTest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Readable } from 'node:stream'
import { _deepCopy, _pick, _sortBy, _omit, localTimeNow } from '@naturalcycles/js-lib'
import { _pipeline, readableToArray, transformNoOp } from '@naturalcycles/nodejs-lib'
import { _pipeline, readableToArray } from '@naturalcycles/nodejs-lib'
import { CommonDaoLogLevel, DBQuery } from '..'
import { CommonDB } from '../common.db'
import { CommonDao } from '../commondao/common.dao'
Expand Down Expand Up @@ -194,11 +194,7 @@ export function runCommonDaoTest(db: CommonDB, quirks: CommonDBImplementationQui
})

test('streamQuery all', async () => {
// let rows = await readableToArray(dao.query().streamQuery())
// todo: remove transformNoOp after `transformMap` learns to be async-iteration-friendly
let rows: TestItemBM[] = await readableToArray(
dao.query().streamQuery().pipe(transformNoOp()),
)
let rows: TestItemBM[] = await dao.query().streamQuery().toArray()

rows = _sortBy(rows, r => r.id)
expectMatch(expectedItems, rows, quirks)
Expand Down
6 changes: 3 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -879,9 +879,9 @@
zod "^3.20.2"

"@naturalcycles/nodejs-lib@^13.0.1", "@naturalcycles/nodejs-lib@^13.0.2", "@naturalcycles/nodejs-lib@^13.1.1":
version "13.9.1"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.9.1.tgz#98aa9ee69cdbe12c20793f2d7b3375556e7d8269"
integrity sha512-IpUGFlepG1KsOi5srYM3zHvlQt4ZXyNeoGd22ggbM2Gi8prJOxbFC76cNwe2Qj9PS+5KHSayGbAhxnpRTREyVg==
version "13.10.0"
resolved "https://registry.yarnpkg.com/@naturalcycles/nodejs-lib/-/nodejs-lib-13.10.0.tgz#0914dea4fced163a9642f09def35f242b9daf565"
integrity sha512-KKxpAb6oK250Lk7t2nGTrc+T3YD6R0Ba0c19wbuJNXDZToZ0FRooQc1vgcrjksMpHDebWBPYz0g9IIIbA+gQzA==
dependencies:
"@naturalcycles/js-lib" "^14.0.0"
"@types/js-yaml" "^4.0.9"
Expand Down

0 comments on commit 0910865

Please sign in to comment.