diff --git a/client/vscode-lib/src/controller.test.ts b/client/vscode-lib/src/controller.test.ts index 74d496fc..0ea82037 100644 --- a/client/vscode-lib/src/controller.test.ts +++ b/client/vscode-lib/src/controller.test.ts @@ -5,12 +5,16 @@ export function createMockController(): MockedObject { return { observeMeta: vi.fn(), meta: vi.fn(), + metaChanges__asyncGenerator: vi.fn(), observeMentions: vi.fn(), mentions: vi.fn(), + mentionsChanges__asyncGenerator: vi.fn(), observeItems: vi.fn(), items: vi.fn(), + itemsChanges__asyncGenerator: vi.fn(), observeAnnotations: vi.fn(), annotations: vi.fn(), + annotationsChanges__asyncGenerator: vi.fn(), } } diff --git a/client/vscode-lib/src/controller.ts b/client/vscode-lib/src/controller.ts index f3a8d36e..18f463c9 100644 --- a/client/vscode-lib/src/controller.ts +++ b/client/vscode-lib/src/controller.ts @@ -25,12 +25,15 @@ type VSCodeClient = Client export interface Controller { observeMeta: VSCodeClient['metaChanges'] meta: VSCodeClient['meta'] + metaChanges__asyncGenerator: VSCodeClient['metaChanges__asyncGenerator'] observeMentions: VSCodeClient['mentionsChanges'] mentions: VSCodeClient['mentions'] + mentionsChanges__asyncGenerator: VSCodeClient['mentionsChanges__asyncGenerator'] observeItems: VSCodeClient['itemsChanges'] items: VSCodeClient['items'] + itemsChanges__asyncGenerator: VSCodeClient['itemsChanges__asyncGenerator'] observeAnnotations( doc: Pick, @@ -39,6 +42,11 @@ export interface Controller { doc: Pick, opts?: ProviderMethodOptions, ): ReturnType + annotationsChanges__asyncGenerator( + doc: Pick, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): ReturnType } export function createController({ @@ -135,6 +143,10 @@ export function createController({ UserAction.Implicit, client.annotationsChanges, ) + const clientAnnotationsChanges__asyncGenerator = errorReporter.wrapAsyncGenerator( + UserAction.Implicit, + client.annotationsChanges__asyncGenerator, + ) /** * The controller is passed to UI feature providers for them to fetch data. @@ -142,12 +154,24 @@ export function createController({ const controller: Controller = { meta: errorReporter.wrapPromise(UserAction.Explicit, client.meta), observeMeta: errorReporter.wrapObservable(UserAction.Explicit, client.metaChanges), + metaChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( + UserAction.Explicit, + client.metaChanges__asyncGenerator, + ), mentions: errorReporter.wrapPromise(UserAction.Explicit, client.mentions), observeMentions: errorReporter.wrapObservable(UserAction.Explicit, client.mentionsChanges), + mentionsChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( + UserAction.Explicit, + client.mentionsChanges__asyncGenerator, + ), items: errorReporter.wrapPromise(UserAction.Explicit, client.items), observeItems: errorReporter.wrapObservable(UserAction.Explicit, client.itemsChanges), + itemsChanges__asyncGenerator: errorReporter.wrapAsyncGenerator( + UserAction.Explicit, + client.itemsChanges__asyncGenerator, + ), async annotations(doc: vscode.TextDocument, opts?: ProviderMethodOptions) { if (ignoreDoc(doc)) { @@ -174,6 +198,28 @@ export function createController({ opts, ) }, + async *annotationsChanges__asyncGenerator( + doc: vscode.TextDocument, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ) { + if (ignoreDoc(doc)) { + return + } + + const g = clientAnnotationsChanges__asyncGenerator( + { + uri: doc.uri.toString(), + content: doc.getText(), + }, + opts, + signal, + ) + for await (const v of g) { + yield v + } + return + }, } if (features.annotations) { diff --git a/client/vscode-lib/src/util/errorReporter.ts b/client/vscode-lib/src/util/errorReporter.ts index a469f5c6..738ab448 100644 --- a/client/vscode-lib/src/util/errorReporter.ts +++ b/client/vscode-lib/src/util/errorReporter.ts @@ -67,6 +67,46 @@ export class ErrorReporterController implements vscode.Disposable { } } + /** + * wraps providerMethod to ensure it reports errors to the user. + */ + public wrapAsyncGenerator( + userAction: UserAction, + providerMethod: ( + params: T, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ) => AsyncGenerator, + ) { + const getErrorReporter = this.getErrorReporter.bind(this) + return async function* ( + params: T, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): AsyncGenerator { + const errorReporter = getErrorReporter(userAction, opts) + if (errorReporter.skip) { + return + } + + opts = withErrorHook(opts, (providerUri, error) => { + errorReporter.onError(providerUri, error) + errorReporter.report() + }) + + try { + for await (const value of providerMethod(params, opts, signal)) { + errorReporter.onValue(undefined) + errorReporter.report() + yield value + } + } catch (error) { + errorReporter.onError(undefined, error) + errorReporter.report() + } + } + } + /** * wraps providerMethod to ensure it reports errors to the user. */ diff --git a/lib/client/src/api.ts b/lib/client/src/api.ts index 93a767e8..dd3c8274 100644 --- a/lib/client/src/api.ts +++ b/lib/client/src/api.ts @@ -15,6 +15,7 @@ import { catchError, combineLatest, defer, + distinctUntilChanged, from, map, mergeMap, @@ -99,6 +100,7 @@ function observeProviderCall( : of([]), ), map(result => result.filter((v): v is EachWithProviderUri => v !== null).flat()), + distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)), tap(items => { if (LOG_VERBOSE) { logger?.(`got ${items.length} results: ${JSON.stringify(items)}`) diff --git a/lib/client/src/client/client.test.ts b/lib/client/src/client/client.test.ts index d7b6aa8e..8856c5aa 100644 --- a/lib/client/src/client/client.test.ts +++ b/lib/client/src/client/client.test.ts @@ -1,6 +1,6 @@ -import type { AnnotationsParams, ItemsParams } from '@openctx/protocol' +import type { AnnotationsParams, ItemsParams, MetaResult } from '@openctx/protocol' import type { Item, Range } from '@openctx/schema' -import { firstValueFrom, of } from 'rxjs' +import { Observable, firstValueFrom, of } from 'rxjs' import { TestScheduler } from 'rxjs/testing' import { describe, expect, test } from 'vitest' import type { Annotation, EachWithProviderUri } from '../api.js' @@ -44,6 +44,37 @@ describe('Client', () => { const testScheduler = (): TestScheduler => new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected)) + describe('meta', () => { + test('metaChanges__asyncGenerator', async () => { + const client = createTestClient({ + configuration: () => + new Observable(observer => { + observer.next({ enable: false, providers: {} }) + observer.next({ + enable: true, + providers: { [testdataFileUri('simpleMeta.js')]: { nameSuffix: '1' } }, + }) + observer.next({ + enable: true, + providers: { [testdataFileUri('simpleMeta.js')]: { nameSuffix: '2' } }, + }) + observer.complete() + }), + }) + + const values: EachWithProviderUri[] = [] + const signal = new AbortController().signal + for await (const value of client.metaChanges__asyncGenerator({}, {}, signal)) { + values.push(value) + } + expect(values).toStrictEqual([ + [], + [{ name: 'simpleMeta-1', providerUri: testdataFileUri('simpleMeta.js') }], + [{ name: 'simpleMeta-2', providerUri: testdataFileUri('simpleMeta.js') }], + ]) + }) + }) + describe('items', () => { test('with providers', async () => { const client = createTestClient({ diff --git a/lib/client/src/client/client.ts b/lib/client/src/client/client.ts index ceb7f587..5b35f598 100644 --- a/lib/client/src/client/client.ts +++ b/lib/client/src/client/client.ts @@ -45,6 +45,7 @@ import { } from '../configuration.js' import type { Logger } from '../logger.js' import { type ProviderClient, createProviderClient } from '../providerClient/createProviderClient.js' +import { observableToAsyncGenerator } from './util.js' /** * Hooks for the OpenCtx {@link Client} to access information about the environment, such as the @@ -176,11 +177,25 @@ export interface Client { ): Observable> /** - * Get the candidate items returned by the configured providers. + * Observe information about the configured providers using an async generator. + * + * The returned generator streams information as it is received from the providers and continues + * passing along any updates until {@link signal} is aborted. + * + * @internal + */ + metaChanges__asyncGenerator( + params: MetaParams, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): AsyncGenerator> + + /** + * Get the mentions returned by the configured providers. * * It does not continue to listen for changes, as {@link Client.mentionsChanges} does. Using * {@link Client.Mentions} is simpler and does not require use of observables (with a library - * like RxJS), but it means that the client needs to manually poll for updated items if + * like RxJS), but it means that the client needs to manually poll for updated mentions if * freshness is important. */ mentions( @@ -189,9 +204,9 @@ export interface Client { ): Promise> /** - * Observe OpenCtx candidate items from the configured providers. + * Observe OpenCtx mentions from the configured providers. * - * The returned observable streams candidate items as they are received from the providers and + * The returned observable streams mentions as they are received from the providers and * continues passing along any updates until unsubscribed. */ mentionsChanges( @@ -199,6 +214,20 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable> + /** + * Observe OpenCtx mentions from the configured providers using an async generator. + * + * The returned generator streams mentions as they are received from the providers and continues + * passing along any updates until {@link signal} is aborted. + * + * @internal + */ + mentionsChanges__asyncGenerator( + params: MentionsParams, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): AsyncGenerator> + /** * Get the items returned by the configured providers. * @@ -220,6 +249,21 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable> + /** + * Observe OpenCtx items from the configured providers for the given resource using an async + * generator. + * + * The returned generator streams items as they are received from the providers and continues + * passing along any updates until {@link signal} is aborted. + * + * @internal + */ + itemsChanges__asyncGenerator( + params: ItemsParams, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): AsyncGenerator> + /** * Get the annotations returned by the configured providers for the given resource. * @@ -244,6 +288,21 @@ export interface Client { opts?: ProviderMethodOptions, ): Observable[]>> + /** + * Observe OpenCtx annotations from the configured providers for the given resource using an + * async generator. + * + * The returned generator streams annotations as they are received from the providers and + * continues passing along any updates until {@link signal} is aborted. + * + * @internal + */ + annotationsChanges__asyncGenerator( + params: AnnotationsParams, + opts?: ProviderMethodOptions, + signal?: AbortSignal, + ): AsyncGenerator[]>> + /** * Dispose of the client and release all resources. */ @@ -406,21 +465,32 @@ export function createClient(env: ClientEnv): Client { defaultValue: [], }), metaChanges: (params, opts) => metaChanges(params, { ...opts, emitPartial: true }), + metaChanges__asyncGenerator: (params, opts, signal) => + observableToAsyncGenerator(metaChanges(params, { ...opts, emitPartial: true }), signal), mentions: (params, opts) => firstValueFrom(mentionsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), mentionsChanges: (params, opts) => mentionsChanges(params, { ...opts, emitPartial: true }), + mentionsChanges__asyncGenerator: (params, opts, signal) => + observableToAsyncGenerator(mentionsChanges(params, { ...opts, emitPartial: true }), signal), items: (params, opts) => firstValueFrom(itemsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), itemsChanges: (params, opts) => itemsChanges(params, { ...opts, emitPartial: true }), + itemsChanges__asyncGenerator: (params, opts, signal) => + observableToAsyncGenerator(itemsChanges(params, { ...opts, emitPartial: true }), signal), annotations: (params, opts) => firstValueFrom(annotationsChanges(params, { ...opts, emitPartial: false }), { defaultValue: [], }), annotationsChanges: (params, opts) => annotationsChanges(params, { ...opts, emitPartial: true }), + annotationsChanges__asyncGenerator: (params, opts, signal) => + observableToAsyncGenerator( + annotationsChanges(params, { ...opts, emitPartial: true }), + signal, + ), dispose() { for (const sub of subscriptions) { sub.unsubscribe() diff --git a/lib/client/src/client/testdata/simple.js b/lib/client/src/client/testdata/simple.js index cd0479b4..dc3bf5cf 100644 --- a/lib/client/src/client/testdata/simple.js +++ b/lib/client/src/client/testdata/simple.js @@ -1,6 +1,6 @@ /** @type {import('@openctx/provider').Provider} */ export default { - meta: () => ({ annotations: {} }), + meta: () => ({ name:'simple', annotations: {} }), items: () => [ { title: 'A', diff --git a/lib/client/src/client/testdata/simpleMeta.js b/lib/client/src/client/testdata/simpleMeta.js new file mode 100644 index 00000000..1d9fb407 --- /dev/null +++ b/lib/client/src/client/testdata/simpleMeta.js @@ -0,0 +1,4 @@ +/** @type {import('@openctx/provider').Provider} */ +export default { + meta: (_, settings) => ({ name: `simpleMeta-${settings.nameSuffix}` }), +} diff --git a/lib/client/src/client/util.test.ts b/lib/client/src/client/util.test.ts new file mode 100644 index 00000000..91a12548 --- /dev/null +++ b/lib/client/src/client/util.test.ts @@ -0,0 +1,71 @@ +import { Observable } from 'rxjs' +import { describe, expect, test } from 'vitest' +import { observableToAsyncGenerator } from './util.js' + +describe('observableToAsyncGenerator', () => { + test('observable that emits and completes', async () => { + const testObservable = new Observable(observer => { + observer.next(1) + observer.next(2) + observer.complete() + }) + + const results: number[] = [] + for await (const value of observableToAsyncGenerator(testObservable)) { + results.push(value) + } + expect(results).toEqual([1, 2]) + }) + + test('observable that immediately completes', async () => { + const testObservable = new Observable(observer => { + observer.complete() + }) + + const results: number[] = [] + for await (const value of observableToAsyncGenerator(testObservable)) { + results.push(value) + } + expect(results).toEqual([]) + }) + + test('observable that emits then errors', async () => { + const testObservable = new Observable(observer => { + observer.next(1) + observer.error('x') + }) + + const results: number[] = [] + let thrown: any + try { + for await (const value of observableToAsyncGenerator(testObservable)) { + results.push(value) + } + } catch (error) { + thrown = error + } + expect(results).toEqual([1]) + expect(thrown).toBe('x') + }) + + test('with an AbortSignal', async () => { + const INTERVAL = 10 + const testObservable = new Observable(observer => { + let i = 1 + const intervalHandle = setInterval(() => { + observer.next(i++) + }, INTERVAL) + return () => clearTimeout(intervalHandle) + }) + + const abortController = new AbortController() + const results: number[] = [] + for await (const value of observableToAsyncGenerator(testObservable, abortController.signal)) { + results.push(value) + if (value === 5) { + abortController.abort() + } + } + expect(results).toEqual([1, 2, 3, 4, 5]) + }) +}) diff --git a/lib/client/src/client/util.ts b/lib/client/src/client/util.ts new file mode 100644 index 00000000..ba1bfed1 --- /dev/null +++ b/lib/client/src/client/util.ts @@ -0,0 +1,63 @@ +import type { Observable } from 'rxjs' + +export async function* observableToAsyncGenerator( + observable: Observable, + signal?: AbortSignal, +): AsyncGenerator { + const queue: T[] = [] + let thrown: unknown + let resolve: (() => void) | undefined + let reject: ((error: unknown) => void) | undefined + let finished = false + + const subscription = observable.subscribe({ + next: value => { + queue.push(value) + resolve?.() + resolve = undefined + }, + error: error => { + thrown = error + reject?.(thrown) + reject = undefined + }, + complete: () => { + finished = true + resolve?.() + resolve = undefined + }, + }) + + let removeAbortListener: (() => void) | undefined = undefined + if (signal) { + const handler = () => { + resolve?.() + resolve = undefined + finished = true + } + signal.addEventListener('abort', handler) + removeAbortListener = () => { + signal.removeEventListener('abort', handler) + } + } + + try { + while (true) { + if (queue.length > 0) { + yield queue.shift()! + } else if (thrown) { + throw thrown + } else if (finished) { + break + } else { + await new Promise((res, rej) => { + resolve = res + reject = rej + }) + } + } + } finally { + subscription.unsubscribe() + removeAbortListener?.() + } +}