Skip to content

Commit

Permalink
Retry remote watch on end of stream, if not user-cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnspurlock-skymethod committed Dec 5, 2023
1 parent 691fe6a commit eb07664
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions src/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import { encodeJson as encodeJsonWatch } from './proto/messages/com/deno/kv/data
import { decodeBinary as decodeWatchOutput } from './proto/messages/com/deno/kv/datapath/WatchOutput.ts';
import { AtomicWrite, AtomicWriteOutput, SnapshotRead, SnapshotReadOutput, Watch } from './proto/messages/com/deno/kv/datapath/index.ts';
import { ProtoBasedKv, WatchCache } from './proto_based.ts';
import { sleep } from './sleep.ts';
import { makeUnrawWatchStream } from './unraw_watch_stream.ts';
import { decodeV8 as _decodeV8, encodeV8 as _encodeV8 } from './v8.ts';
import { _exponentialBackoffWithJitter } from 'https://deno.land/[email protected]/async/_util.ts';

type Fetcher = typeof fetch;

Expand Down Expand Up @@ -157,12 +159,16 @@ class RemoteKv extends ProtoBasedKv {
}

protected watch_(keys: readonly KvKey[], raw: boolean | undefined): ReadableStream<KvEntryMaybe<unknown>[]> {
const { watches } = this;
const { watches, debug } = this;
const watchId = [...watches.keys()].reduce((a, b) => Math.max(a, b), 0) + 1;
let readDisabled = false;
let endOfStreamReached = false;
let readerCancelled = false;
let attempt = 1;
let readStarted = -1;
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
async function* yieldResults(kv: RemoteKv) {
const { metadata, debug, fetcher, maxRetries, decodeV8 } = kv;
const { metadata, fetcher, maxRetries, decodeV8 } = kv;
if (metadata.version < 3) throw new Error(`watch: Only supported in version 3 of the protocol or higher`);

const endpointUrl = await kv.locateEndpointUrl('eventual', readDisabled); // force refetch if retrying after receiving read disabled
Expand All @@ -175,12 +181,16 @@ class RemoteKv extends ProtoBasedKv {
const stream = await fetchWatchStream(watchUrl, accessToken, metadata.databaseId, req, fetcher, maxRetries, metadata.version);
reader = stream.getReader(); // can't use byob for node compat (fetch() response body streams are ReadableStream { locked: false, state: 'readable', supportsBYOB: false }), see https://github.com/nodejs/undici/issues/1873
const byteReader = new ByteReader(reader); // use our own buffered reader
endOfStreamReached = false;
readerCancelled = false;
readStarted = Date.now();
try {
const cache = new WatchCache(decodeV8, keys);
while (true) {
const { done, value } = await byteReader.read(4);
if (done) {
if (debug) console.log(`watch: done! returning`);
endOfStreamReached = true;
return;
}
const n = new DataView(value.buffer).getInt32(0, true);
Expand All @@ -189,6 +199,7 @@ class RemoteKv extends ProtoBasedKv {
const { done, value } = await byteReader.read(n);
if (done) {
if (debug) console.log(`watch: done before message! returning`);
endOfStreamReached = true;
return;
}
const output = decodeWatchOutput(value);
Expand All @@ -214,19 +225,35 @@ class RemoteKv extends ProtoBasedKv {
}

async function* yieldResultsLoop(kv: RemoteKv) {
for await (const entries of yieldResults(kv)) {
yield entries;
}
if (readDisabled) {
// retry and refetch metadata
while (true) {
for await (const entries of yieldResults(kv)) {
yield entries;
}
if (readDisabled) {
if (debug) console.log(`watch: readDisabled, retry and refresh metadata`);
} else if (endOfStreamReached && !readerCancelled) {
const readDuration = readStarted > -1 ? (Date.now() - readStarted) : 0;
if (readDuration > 60000) attempt = 1; // we read for at least a minute, reset attempt counter to avoid missing updates
const timeout = Math.round(_exponentialBackoffWithJitter(
60000, // max timeout
1000, // min timeout
attempt,
2, // multiplier
1, // full jitter
));
if (debug) console.log(`watch: endOfStreamReached, retry after ${timeout}ms, attempt=${attempt}`);
await sleep(timeout);
attempt++;
} else {
if (debug) console.log(`watch: end of retry loop`);
return;
}
}
}
// return ReadableStream.from(yieldResultsLoop(this)); // not supported by dnt/node
const generator = yieldResultsLoop(this);
const cancelReaderIfNecessary = async () => {
readerCancelled = true;
await reader?.cancel();
reader = undefined;
}
Expand Down

0 comments on commit eb07664

Please sign in to comment.