From 1f25621ec008d083d7088ae0a14778916facbea0 Mon Sep 17 00:00:00 2001 From: Teo Stocco Date: Thu, 22 Sep 2022 09:30:30 +0200 Subject: [PATCH 1/2] fix xread timeout --- stream.ts | 63 ++++++++------------- tests/commands/stream.ts | 115 +++++++++++++++++++-------------------- 2 files changed, 80 insertions(+), 98 deletions(-) diff --git a/stream.ts b/stream.ts index ddf9ee62..7617797f 100644 --- a/stream.ts +++ b/stream.ts @@ -43,10 +43,7 @@ export type XReadReplyRaw = XReadStreamRaw[]; * which will represent the the epoch Millis with * seqNo of zero. (Especially useful is to pass 0.) */ -export type XIdInput = - | XId - | [number, number] - | number; +export type XIdInput = XId | [number, number] | number; /** * ID input type for XADD, which is allowed to include the * "*" operator. */ @@ -212,9 +209,7 @@ export interface XClaimOpts { justXId?: boolean; } -export function parseXMessage( - raw: XReadIdData, -): XMessage { +export function parseXMessage(raw: XReadIdData): XMessage { const fieldValues: Record = {}; let f: string | undefined = undefined; @@ -248,11 +243,9 @@ export function convertMap(raw: ConditionalArray): Map { return fieldValues; } -export function parseXReadReply( - raw: XReadReplyRaw, -): XReadReply { +export function parseXReadReply(raw: XReadReplyRaw): XReadReply { const out: XReadStream[] = []; - for (const [key, idData] of raw) { + for (const [key, idData] of raw ?? []) { const messages = []; for (const rawMsg of idData) { messages.push(parseXMessage(rawMsg)); @@ -286,18 +279,18 @@ export function parseXPendingCounts(raw: ConditionalArray): XPendingCount[] { const infos: XPendingCount[] = []; for (const r of raw) { if ( - isCondArray(r) && isString(r[0]) && - isString(r[1]) && isNumber(r[2]) && + isCondArray(r) && + isString(r[0]) && + isString(r[1]) && + isNumber(r[2]) && isNumber(r[3]) ) { - infos.push( - { - xid: parseXId(r[0]), - owner: r[1], - lastDeliveredMs: r[2], - timesDelivered: r[3], - }, - ); + infos.push({ + xid: parseXId(r[0]), + owner: r[1], + lastDeliveredMs: r[2], + timesDelivered: r[3], + }); } } @@ -314,30 +307,22 @@ export function parseXGroupDetail(rawGroups: ConditionalArray): XGroupDetail[] { // array of arrays const consDeets = data.get("consumers") as ConditionalArray[]; - out.push( - { - name: rawstr(data.get("name") ?? null), - lastDeliveredId: parseXId( - rawstr(data.get("last-delivered-id") ?? null), - ), - pelCount: rawnum(data.get("pel-count") ?? null), - pending: parseXPendingCounts( - data.get("pending") as ConditionalArray, - ), - consumers: parseXConsumerDetail( - consDeets, - ), - }, - ); + out.push({ + name: rawstr(data.get("name") ?? null), + lastDeliveredId: parseXId( + rawstr(data.get("last-delivered-id") ?? null), + ), + pelCount: rawnum(data.get("pel-count") ?? null), + pending: parseXPendingCounts(data.get("pending") as ConditionalArray), + consumers: parseXConsumerDetail(consDeets), + }); } } return out; } -export function parseXConsumerDetail( - nestedRaws: Raw[][], -): XConsumerDetail[] { +export function parseXConsumerDetail(nestedRaws: Raw[][]): XConsumerDetail[] { const out: XConsumerDetail[] = []; for (const raws of nestedRaws) { diff --git a/tests/commands/stream.ts b/tests/commands/stream.ts index bb390dff..b67ac8f4 100644 --- a/tests/commands/stream.ts +++ b/tests/commands/stream.ts @@ -1,4 +1,4 @@ -import { ErrorReplyError, Redis } from "../../mod.ts"; +import { ErrorReplyError, Redis, XReadReply } from "../../mod.ts"; import { parseXId } from "../../stream.ts"; import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; import { @@ -15,9 +15,7 @@ import { import { newClient } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; -export function streamTests( - getServer: () => TestServer, -): void { +export function streamTests(getServer: () => TestServer): void { let client!: Redis; beforeAll(async () => { const server = getServer(); @@ -28,7 +26,7 @@ export function streamTests( const rnum = () => Math.floor(Math.random() * 1000); const randomStream = () => - `test-deno-${(new Date().getTime())}-${rnum()}${rnum()}${rnum()}`; + `test-deno-${new Date().getTime()}-${rnum()}${rnum()}${rnum()}`; const cleanupStream = async (client: Redis, ...keys: string[]) => { await Promise.all(keys.map((key) => client.xtrim(key, { elements: 0 }))); @@ -184,6 +182,14 @@ export function streamTests( await cleanupStream(client, key, key2); }); + it("xread manage empty stream on timeout", async () => { + const key = randomStream(); + const [stream] = await client.xread([{ key, xid: 0 }], { + block: 1, + }); + assertEquals(stream, undefined); + }); + it("xgrouphelp", async () => { const helpText = await client.xgroupHelp(); assert(helpText.length > 4); @@ -413,50 +419,47 @@ export function streamTests( }); }); - it( - "broadcast pattern, all groups read their own version of the stream", - async () => { - const key = randomStream(); - const group0 = "tg0"; - const group1 = "tg1"; - const group2 = "tg2"; - const groups = [group0, group1, group2]; - - for (const g of groups) { - const created = await client.xgroupCreate(key, g, "$", true); - assertEquals(created, "OK"); - } + it("broadcast pattern, all groups read their own version of the stream", async () => { + const key = randomStream(); + const group0 = "tg0"; + const group1 = "tg1"; + const group2 = "tg2"; + const groups = [group0, group1, group2]; - const addedIds = []; + for (const g of groups) { + const created = await client.xgroupCreate(key, g, "$", true); + assertEquals(created, "OK"); + } - let msgCount = 0; - for (const group of groups) { - const payload = `data-${msgCount}`; - const a = await client.xadd(key, "*", { target: payload }); - assert(a); - addedIds.push(a); - msgCount++; + const addedIds = []; - const consumer = "someconsumer"; - const xid = ">"; - const data = await client.xreadgroup([{ key, xid }], { - group, - consumer, - }); + let msgCount = 0; + for (const group of groups) { + const payload = `data-${msgCount}`; + const a = await client.xadd(key, "*", { target: payload }); + assert(a); + addedIds.push(a); + msgCount++; - // each group should see ALL the messages - // that have been emitted - const toCheck = data[0].messages; - assertEquals(toCheck.length, msgCount); - } + const consumer = "someconsumer"; + const xid = ">"; + const data = await client.xreadgroup([{ key, xid }], { + group, + consumer, + }); - for (const g of groups) { - assertEquals(await client.xgroupDestroy(key, g), 1); - } + // each group should see ALL the messages + // that have been emitted + const toCheck = data[0].messages; + assertEquals(toCheck.length, msgCount); + } - await cleanupStream(client, key); - }, - ); + for (const g of groups) { + assertEquals(await client.xgroupDestroy(key, g), 1); + } + + await cleanupStream(client, key); + }); it("xrange and xrevrange", async () => { const key = randomStream(); @@ -528,14 +531,8 @@ export function streamTests( ); assert(firstClaimed.kind === "messages"); assertEquals(firstClaimed.messages.length, 2); - assertEquals( - firstClaimed.messages[0].fieldValues, - { field: "foo" }, - ); - assertEquals( - firstClaimed.messages[1].fieldValues, - { field: "bar" }, - ); + assertEquals(firstClaimed.messages[0].fieldValues, { field: "foo" }); + assertEquals(firstClaimed.messages[1].fieldValues, { field: "bar" }); // ACK these messages so we can try XPENDING/XCLAIM // on a new batch @@ -658,14 +655,14 @@ export function streamTests( ); assert(thirdClaimed.kind === "messages"); assertEquals(thirdClaimed.messages.length, 2); - assertEquals( - thirdClaimed.messages[0].fieldValues, - { field: "woof", farm: "chicken" }, - ); - assertEquals( - thirdClaimed.messages[1].fieldValues, - { field: "bop", farm: "duck" }, - ); + assertEquals(thirdClaimed.messages[0].fieldValues, { + field: "woof", + farm: "chicken", + }); + assertEquals(thirdClaimed.messages[1].fieldValues, { + field: "bop", + farm: "duck", + }); }); }); From 778f04840571ae1e5578e69c7f6d3076eb76e1c6 Mon Sep 17 00:00:00 2001 From: Teo Stocco Date: Sun, 25 Sep 2022 12:02:07 +0200 Subject: [PATCH 2/2] Update tests/commands/stream.ts Co-authored-by: Yuki Tanaka --- tests/commands/stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/commands/stream.ts b/tests/commands/stream.ts index b67ac8f4..644ef057 100644 --- a/tests/commands/stream.ts +++ b/tests/commands/stream.ts @@ -1,4 +1,4 @@ -import { ErrorReplyError, Redis, XReadReply } from "../../mod.ts"; +import { ErrorReplyError, Redis } from "../../mod.ts"; import { parseXId } from "../../stream.ts"; import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; import {