Skip to content

Commit

Permalink
Add tests for RxJS observable usage
Browse files Browse the repository at this point in the history
  • Loading branch information
imhoffd committed May 13, 2023
1 parent 0252dbb commit 2399380
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 36 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"rimraf": "^3.0.2",
"rollup": "^2.70.1",
"rollup-plugin-terser": "^7.0.2",
"rxjs": "^7.8.1",
"ts-jest": "^27.1.4",
"tslib": "^2.4.0",
"typescript": "^4.6.3"
Expand Down
8 changes: 8 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/ProxyClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ProxyClient<T extends RemoteInterface> extends StringClient<T> {
readonly _proxyCache: Map<string, any> = new Map()
}

type Endpoint = ((...args: any[]) => any) | AsyncGenerator<any>
type Endpoint = ((...args: any[]) => any) | AsyncIterator<any>

const createProxy = (client: ProxyClient<any>, basePath?: string) => {
const target = basePath ? client.get(basePath) : client
Expand Down
5 changes: 2 additions & 3 deletions src/StringClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import isNonNull from './lib/isNonNull'
import platform from './lib/platform'

export type RemoteFunction<I, O> = (arg: I) => Promise<O>
export type RemoteObservable<T> = AsyncGenerator<T>
export type RemoteObservable<T> = AsyncIterable<T>
export type RemoteInterface = Record<
string,
RemoteFunction<any, any> | RemoteObservable<any>
Expand Down Expand Up @@ -84,11 +84,10 @@ export default class StringClient<T extends RemoteInterface> implements Client {
if (name.endsWith('$')) {
const observables = this._observableCache.get(name) ?? []
const ob = new AsyncIterableSubject()
const iterator = ob[Symbol.asyncIterator]()

this._observableCache.set(name, [ob, ...observables])

return iterator as T[N]
return ob as unknown as T[N]
} else {
let fn = this._functionCache.get(name)

Expand Down
106 changes: 102 additions & 4 deletions src/__tests__/ProxyClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { from } from 'rxjs'

import LocalConnector from '../LocalConnector'
import ProxyClient from '../ProxyClient'
import type RemoteObservableEvent from '../RemoteObservableEvent'
Expand All @@ -7,9 +9,9 @@ import wait from '../lib/wait'
type ConnectorInterface = {
'fn1': () => Promise<void>
'fn2': (data: string) => Promise<string>
'evt1$': () => AsyncGenerator
'evt1$': () => AsyncGenerator<number>
'nested.fn': (data: string) => Promise<string>
'nested.evt$': () => AsyncGenerator
'nested.evt$': () => AsyncGenerator<number>
}

const createClient = (interfaceOverrides: Partial<ConnectorInterface> = {}) => {
Expand All @@ -33,8 +35,12 @@ const createClient = (interfaceOverrides: Partial<ConnectorInterface> = {}) => {
})

return new ProxyClient<{
[K in keyof ConnectorInterface]: ConnectorInterface[K] extends () => AsyncGenerator
? AsyncGenerator
[K in keyof ConnectorInterface]: ConnectorInterface[K] extends () => AsyncGenerator<
infer T,
infer TReturn,
infer TNext
>
? AsyncGenerator<T, TReturn, TNext>
: ConnectorInterface[K]
}>({ connector, events })
}
Expand Down Expand Up @@ -249,3 +255,95 @@ it('should handle multiple iterations of and multiple values from an observable'
subscribe()
})
})

it('should handle multiple values from an rxjs observable', done => {
const client = createClient({
evt1$: async function* () {
await wait(1)
yield 1
await wait(1)
yield 2
await wait(1)
yield 3
},
})

client.connect().then(() => {
const observable$ = from(client.evt1$)

let iterations = 0

observable$.subscribe(value => {
expect(value).toEqual(++iterations)

if (iterations === 3) {
done()
}
})
})
})

it('should handle multiple iterations of and multiple values from an rxjs observable', done => {
let assertions = 0

const client = createClient({
evt1$: async function* () {
await wait(1)
yield 1
await wait(1)
yield 2
await wait(1)
yield 3
},
})

client.connect().then(() => {
const iterate = () => {
const observable$ = from(client.evt1$)

let iterations = 0

observable$.subscribe(value => {
expect(value).toEqual(++iterations)

if (++assertions === 9) {
done()
}
})
}

iterate()
iterate()
iterate()
})
})

it('should allow unsubscribing from an rxjs observable', done => {
expect.assertions(2)

const client = createClient({
evt1$: async function* () {
await wait(1)
yield 1
await wait(1)
yield 2
await wait(1)
yield 3
},
})

client.connect().then(() => {
const observable$ = from(client.evt1$)

let iterations = 0

const subscription = observable$.subscribe(value => {
expect(value).toEqual(++iterations)

if (iterations === 2) {
subscription.unsubscribe()
done()
}
})
})
})
58 changes: 39 additions & 19 deletions src/lib/AsyncIterableSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,20 @@ import assert from './assert'

export default class AsyncIterableSubject<T> implements AsyncIterable<T> {
private _deferred: Deferred<T> = new Deferred()
private _iterating = false

private _done = false

/**
* Feed a value to the subject.
*
* This method must not be called in sequence synchronously. The previous
* value must be consumed by the iterator before the next value can be
* accepted by the subject.
*/
next(value: T): void {
if (this._done) {
return
}

assert(
this._deferred.state === 'pending',
'Cannot feed value--previous value(s) are unconsumed.',
Expand All @@ -14,28 +25,37 @@ export default class AsyncIterableSubject<T> implements AsyncIterable<T> {
this._deferred.resolve(value)
}

/**
* Mark the subject as done, after which new values are ignored.
*/
done(): void {
assert(
this._deferred.state === 'pending',
'Cannot finish--previous value(s) are unconsumed.',
)

this._deferred.reject()
this._done = true
this._deferred.resolve(undefined as T)
}

async *[Symbol.asyncIterator](): AsyncIterator<T> {
assert(!this._iterating, 'Cannot iterate more than once.')

this._iterating = true

while (true) {
try {
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
next: async () => {
const result = await this._deferred.promise
this._deferred = new Deferred()
yield result
} catch {
return
}
return {
value: result,
done: this._done,
}
},
return: async value => {
this._done = true
return {
value,
done: this._done,
}
},
throw: async e => {
return {
value: e,
done: this._done,
}
},
}
}
}
10 changes: 1 addition & 9 deletions src/lib/__tests__/AsyncIterableSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ it('should throw error when attempting to send new value before consumption', as
)
})

it('should throw error when attempting to cancel before consumption', async () => {
const subject = new AsyncIterableSubject<number>()
subject.next(1)
expect(() => subject.done()).toThrowError(
'Cannot finish--previous value(s) are unconsumed',
)
})

it('should throw error when iterating more than once', async () => {
const subject = new AsyncIterableSubject<number>()
const subscribe = async () => {
Expand Down Expand Up @@ -84,7 +76,7 @@ it('should iterate and finish when done() is called', async () => {
subscribe()

for (const n of [1, 2, 3]) {
await wait(5)
await wait(1)
subject.next(n)
}

Expand Down

0 comments on commit 2399380

Please sign in to comment.