Skip to content

Commit

Permalink
Add done() to AsyncIterableSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
imhoffd committed Jul 15, 2022
1 parent 07c39ae commit 8b17c74
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 16 deletions.
14 changes: 0 additions & 14 deletions src/AsyncIterableSubject.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/Connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
// You should have received a copy of the GNU General Public License version 3
// along with this program. If not, see <https://www.gnu.org/licenses/>.

import type AsyncIterableSubject from './AsyncIterableSubject'
import { ErrorMessages } from './Client'
import type Connection from './Connection'
import type Data from './Data'
import { isDataEmpty } from './Data'
import type RemoteFunctionRequest from './RemoteFunctionRequest'
import type RemoteFunctionResponse from './RemoteFunctionResponse'
import type RemoteObservableEvent from './RemoteObservableEvent'
import type AsyncIterableSubject from './lib/AsyncIterableSubject'
import Deferred from './lib/Deferred'
import generateId from './lib/generateId'
import { log } from './lib/log'
Expand Down
2 changes: 1 addition & 1 deletion src/StringClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// You should have received a copy of the GNU General Public License version 3
// along with this program. If not, see <https://www.gnu.org/licenses/>.

import AsyncIterableSubject from './AsyncIterableSubject'
import type Client from './Client'
import { ErrorMessages } from './Client'
import ClientError from './ClientError'
import type Connector from './Connector'
import type Data from './Data'
import type RemoteObservableEvent from './RemoteObservableEvent'
import AsyncIterableSubject from './lib/AsyncIterableSubject'
import Deferred from './lib/Deferred'
import assert from './lib/assert'
import isNonNull from './lib/isNonNull'
Expand Down
55 changes: 55 additions & 0 deletions src/lib/AsyncIterableSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Peregrine for Web: native container for hybrid apps
// Copyright (C) 2022 Caracal LLC
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License version 3 as published
// by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License version 3
// along with this program. If not, see <https://www.gnu.org/licenses/>.

import Deferred from './Deferred'
import assert from './assert'

export default class AsyncIterableSubject<T> implements AsyncIterable<T> {
private _deferred: Deferred<T> = new Deferred()
private _iterating = false

next(value: T): void {
assert(
this._deferred.state === 'pending',
'Cannot feed value--previous value(s) are unconsumed.',
)

this._deferred.resolve(value)
}

done(): void {
assert(
this._deferred.state === 'pending',
'Cannot finish--previous value(s) are unconsumed.',
)

this._deferred.reject()
}

async *[Symbol.asyncIterator](): AsyncIterator<T> {
assert(!this._iterating, 'Cannot iterate more than once.')

this._iterating = true

while (true) {
try {
yield await this._deferred.promise
this._deferred = new Deferred()
} catch {
return
}
}
}
}
108 changes: 108 additions & 0 deletions src/lib/__tests__/AsyncIterableSubject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Peregrine for Web: native container for hybrid apps
// Copyright (C) 2022 Caracal LLC
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License version 3 as published
// by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License version 3
// along with this program. If not, see <https://www.gnu.org/licenses/>.

import wait from '../../lib/wait'
import AsyncIterableSubject from '../AsyncIterableSubject'

it('should emit a single value and continue looping', async () => {
expect.assertions(1)

const subject = new AsyncIterableSubject<number>()
const subscribe = async () => {
for await (const value of subject) {
expect(value).toBe(1)
}
expect(true).toBe(false) // won't reach because it should continue looping
}

subscribe()
subject.next(1)
})

it('should throw error when attempting to send new value before consumption', async () => {
const subject = new AsyncIterableSubject<number>()
subject.next(1)
expect(() => subject.next(2)).toThrowError(
'Cannot feed value--previous value(s) are unconsumed',
)
})

it('should throw error when attempting to cancel before consumption', async () => {
const subject = new AsyncIterableSubject<number>()
subject.next(1)
expect(() => subject.done()).toThrowError(
'Cannot finish--previous value(s) are unconsumed',
)
})

it('should throw error when iterating more than once', async () => {
const subject = new AsyncIterableSubject<number>()
const subscribe = async () => {
for await (const _ of subject) {
// ignore
}
}

subscribe()

expect(() => subscribe()).rejects.toThrowError(
'Cannot iterate more than once',
)
})

it('should iterate and emit correct values', async () => {
expect.assertions(3)

const subject = new AsyncIterableSubject<number>()
const subscribe = async () => {
let counter = 1
for await (const value of subject) {
expect(value).toBe(counter)
counter++
}
expect(true).toBe(false) // won't reach because it should continue looping
}

subscribe()

for (const n of [1, 2, 3]) {
await wait(5)
subject.next(n)
}
})

it('should iterate and finish when done() is called', async () => {
expect.assertions(4)

const subject = new AsyncIterableSubject<number>()
const subscribe = async () => {
let counter = 1
for await (const value of subject) {
expect(value).toBe(counter)
counter++
}
expect(true).toBe(true)
}

subscribe()

for (const n of [1, 2, 3]) {
await wait(5)
subject.next(n)
}

await wait(5)
subject.done()
})
18 changes: 18 additions & 0 deletions src/lib/wait.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Peregrine for Web: native container for hybrid apps
// Copyright (C) 2022 Caracal LLC
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License version 3 as published
// by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License version 3
// along with this program. If not, see <https://www.gnu.org/licenses/>.

export default async function wait(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
3 changes: 3 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@
},
"include": [
"src/**/*.ts"
],
"exclude": [
"src/**/__tests__/**/*.ts"
]
}

0 comments on commit 8b17c74

Please sign in to comment.