Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: external events #19

Open
KnorpelSenf opened this issue Jul 29, 2022 · 9 comments
Open

feat: external events #19

KnorpelSenf opened this issue Jul 29, 2022 · 9 comments

Comments

@KnorpelSenf
Copy link
Member

It may be interesting to be able to wait for external events, rather than just waiting for Telegram updates.

For example, we could add something like await conversation.waitExternal('id') and then in turn people can do

const session = await storage.read(chatId)
await fireEvent('id', conversation, session)

which loads the right session data, and runs the supplied conversation function.

@swim2sun
Copy link

Hello @KnorpelSenf ,
I am looking forward to using the new features in the engine branch. However, I am facing some challenges due to the lack of documentation, and I am unable to find the corresponding test cases. As a result, I am unsure how to use the new API. I also tried searching for "fireEvent" and "waitExternal" in the repository but couldn't find any results. Could you please provide a brief introduction to the relevant APIs? Thank you!

@KnorpelSenf
Copy link
Member Author

I am going to document this more thoroughly in the coming days and weeks. Here is a relevant test case:

async function convo(conversation: Convo, ctx: Context) {
assertNotStrictEquals(ctx, expected);
assertEquals(ctx.update, expected.update);
ctx = await conversation.wait();
assertNotStrictEquals(ctx, expected);
assertEquals(ctx.update, expected.update);
ctx = await conversation.wait();
assertNotStrictEquals(ctx, expected);
assertEquals(ctx.update, expected.update);
i++;
}
const first = await enterConversation(convo, expected);
assertEquals(first.status, "handled");
assert(first.status === "handled");
const copy = structuredClone(first);
const second = await resumeConversation(convo, expected, copy);
assertEquals(second.status, "handled");
assert(second.status === "handled");
const otherCopy = { ...structuredClone(second), args: first.args };
const third = await resumeConversation(convo, expected, otherCopy);
assertEquals(third.status, "complete");
assert(third.status === "complete");
assertEquals(i, 1);
});

The naming in the above description is not used in the implementation. You can enter a conversation which gives you some state, and then you can pass this state when resuming a conversation.

You can view an HTML overview of the plugin's API surface at https://doc.deno.land/https://raw.githubusercontent.com/grammyjs/conversations/refs/heads/engine/src/mod.ts. It says Deno everywhere because they provide the tooling to create such a page, but the plugin works identically on Node.js.

Instead of fireEvent you will have to take your event data and wrap it inside a Context object for now. That is not ideal, and the TypeScript types will be improved here, but it should work well apart from a type-cast. You can then access the event data inside the conversation using

const event = await conversation.wait()
const data = event.update as MyEventData

LMK if you have further questions, and apologies for the rough DX. After all, it's not released yet.

@swim2sun
Copy link

swim2sun commented Oct 16, 2024

@KnorpelSenf Thank you for your quick and detailed response. However, I'm still a bit confused about the usage of the new API. I'm new to grammY, so please forgive me if my understanding of the technical details is incorrect:

  1. When and where should we call enterConversation and resumeConversation? Is it within a conversation? Wouldn't that create nested conversations? Or should these be called outside of a conversation, and if so, how do we access the context?

  2. My current understanding is that after calling ctx.conversation.enter('conv-id') to enter a conversation, grammY takes control of the conversation. When a new update arrives, the plugin resumes from where it previously waited. Now, does conversation.wait() resume the conversation both when a new update arrives and when resumeConversation is called?

  3. Let's discuss a specific example. In my scenario, I need to poll a database for a certain status halfway through a conversation, then continue returning information to the user. I initially tried to implement this using a while loop with conversation.sleep. The code looked something like this:

const composer = new Composer<MyContext>();

async function generateImage(conversation: MyConversation, ctx: MyContext) {
  conversation.run(hydrate());
  const modelKeyboard = new InlineKeyboard()
    .text(ctx.t("model-sd"), "sd")
    .text(ctx.t("model-flux"), "flux");
  await ctx.reply(
    ctx.t("pls-select-model"),
    { reply_markup: modelKeyboard }
  );
  const modelCtx = await conversation.waitForCallbackQuery(["sd", "flux"] , {
    otherwise: (ctx) => ctx.reply(ctx.t("select-model"), { reply_markup: modelKeyboard}),
  });
  await modelCtx.answerCallbackQuery();
  const model = modelCtx.match;
  // prompt
  // similar code to get prompt
  const prompt = ...;
  let task: ImageTask = await conversation.external(async () => await prisma.imageTask.create({
    data: {
      model: model.toString(),
      prompt: prompt.toString(),
    },
  }));
  console.log("task created: " + task.id);
  await ctx.reply(ctx.t("task-created"));
  let secs = 0;
  while (task.status !== 'SUCCESS') {
    const statusMessage = await ctx.reply("waiting " + secs + "s");
    await conversation.sleep(1000); 
    task = await conversation.external(async () => await prisma.imageTask.findUniqueOrThrow({where: { id: task.id }}));
  }I
  await ctx.reply("success");
}

composer.use(createConversation(generateImage));
composer.command("generate-image", (ctx) => ctx.conversation.enter("generateImage"));

However, this turned out to be an incorrect usage, as the program reported a webhook timeout error. How can this code be rewritten using the new features?

@swim2sun
Copy link

@KnorpelSenf Hello again, after reviewing the source code, I've noticed that both enterConversation and resumeConversation are currently called within middleware. This means these methods are only triggered when an update arrives.

  1. Let's say I implement a custom middleware that queries a database to determine whether to resume a conversation. Is it correct to assume that this code would only execute when an update arrives? If we query only once, it would happen only when an update comes in. On the other hand, if we check in a loop, we might run into webhook timeout issues. Is my understanding correct?

  2. Alternatively, if I set up an external scheduled task to check the database, I face a new challenge: How should I construct the Context when calling resumeConversation?

@KnorpelSenf
Copy link
Member Author

Got a bit much to do right now, let me get back to you this weekend, sorry for the delay

@KnorpelSenf
Copy link
Member Author

Maybe as a very short (and probably too short) hint, you'll have to provide a storage adapter and then query that storage yourself in order to obtain conversation state. This state can in turn be passed to resumeConversation. I'll follow up with a better example in a few days.

@swim2sun
Copy link

@KnorpelSenf There's no rush at all, please take your time with your other tasks. If possible, a concrete example would be very helpful when you have a chance to provide one, thanks.

@KnorpelSenf
Copy link
Member Author

KnorpelSenf commented Oct 21, 2024

I have pushed a few commits that make this case easier. You'll need to pull a new version.

This is how you can manually load state and use resumeConversation to resume a conversation.

type MyContext = ConversationFlavor<Context>;
type MyConversationContext = Context;

type MyConversation = Conversation<MyConversationContext>;

const bot = new Bot<MyContext>("redacted");

const version = 0;
const fileAdapter = new FileAdapter<VersionedState<ConversationData>>({
    dirName: "/tmp",
});

bot.use(
    conversations({
        storage: {
            type: "key",
            version,
            getStorageKey: (ctx) => ctx.chatId?.toString(),
            adapter: fileAdapter,
        },
    }),
);
bot.command(
    "active",
    (ctx) => ctx.reply(JSON.stringify(ctx.conversation.active(), null, 2)),
);

interface MyEvent {
    type: "event";

    foo: string;
    bar: number;
}
async function convo(conversation: MyConversation, ctx: MyConversationContext) {
    const { update } = await conversation.waitUntil((ctx) =>
        // only wait for external events
        "type" in ctx.update && ctx.update.type === "event"
    );
    const event = update as unknown as MyEvent; // cast back from update until plugin is improved
    await ctx.reply(`Received ${event.foo}`);
}
bot.use(createConversation(convo));
bot.command("start", (ctx) => ctx.conversation.enter("convo"));

const { versionify, unpack } = pinVersion(version);
async function supplyExternalEvent(chat_id: number, event: MyEvent) {
    // fetch data
    const key = chat_id.toString();
    const data = await fileAdapter.read(key);
    const state = unpack(data);
    if (state === undefined) return; // bad or missing data for chat_id
    const convoState = state.convo?.[0];
    if (convoState === undefined) return; // convo not entered
    const baseData = {
        update: event as unknown as Update, // cast to update until plugin is improved
        api: bot.api,
        me: bot.botInfo,
    };
    // run conversation
    const res = await resumeConversation(convo, baseData, convoState);
    // handle result
    switch (res.status) {
        case "skipped":
            return;
        case "complete":
        case "error":
            await fileAdapter.delete(key);
            return;
        case "handled": {
            const newState: ConversationState = {
                args: convoState.args,
                interrupts: res.interrupts,
                replay: res.replay,
            };
            state.convo[0] = newState;
            await fileAdapter.write(key, versionify(state));
            return;
        }
    }
}

bot.start();

Here are a few interesting things to note about the above code:

  • You can now call supplyExternalEvent in order to supply an external event for a chat.
  • The data that is loaded is versioned in order to validate it automatically when new versions are released. You therefore need to provide your own version number or string (can be left out, defaulting to 0) using pinVersion.
  • The data contains the state of several conversation identifiers, and each conversation identifier holds an array of state objects. This is because the plugin can run many different conversations concurrently even inside the same chat. The data structure is described here: https://doc.deno.land/https://raw.githubusercontent.com/grammyjs/conversations/refs/heads/engine/src/mod.ts/~/ConversationData
  • This example uses the file adapter from https://github.com/grammyjs/storages/tree/main/packages/file but you are free to use any other storage adapter or even use your own storage solution. You should think about data races that can happen when an external event is sent at the same time as a message arrives.
  • The plugin currently supports this manual way of supplying external events, but the types are still assuming that all events are updates from the Bot API. This is why the type casts are necessary.
  • I quickly threw this together as educational code but I did not test it much, so forgive me if there are bugs

@swim2sun
Copy link

@KnorpelSenf Thank you very much, your code is very clear, detailed, and helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Status: To do
Development

No branches or pull requests

2 participants