Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncTaskiqDecoratedTask().kicker().with_labels() mutates task labels #301

Open
donc310 opened this issue Mar 1, 2024 · 2 comments
Open

Comments

@donc310
Copy link

donc310 commented Mar 1, 2024

Hello and thank you very much for your work on this project.

I currently have an issue where tasks submitted with custom labels mutates the underlying decorated task instance

@pytest.mark.anyio
async def test_with_label() -> None:
    trace_ids = {}

    class Tracer(TaskiqMiddleware):
        def pre_send(self, message: TaskiqMessage) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
            nonlocal trace_ids
            trace_ids[tuple(message.args)] = message.labels.get("trace_id")
            return super().pre_send(message)

    broker = InMemoryBroker().with_middlewares(Tracer())

    @broker.task()
    def run_task(a) -> int:
        return a

    task1 = await run_task.kicker().with_labels(trace_id="11111").kiq(1)
    task2 = await run_task.kicker().with_labels(trace_id="22222").kiq(2)
    task3 = await run_task.kiq(3)

    await task1.wait_result(timeout=1)
    await task2.wait_result(timeout=1)
    await task3.wait_result(timeout=1)

    assert trace_ids == {(1,): "11111", (2,): "22222", (3,): None}

For some context on what we were trying to achieve;

We were building a custom Sentry Integration for TaskIq that would add tracing meta-data as labels to queued messages, to achieve this we patched AsyncKicker.kiq and Receiver.run_task methods to add tracing info to message labels which would track tasks from when there were submitted to when a worker picks up and processes the message.

The above test case fails because after the second task is submitted all other subsequent tasks will have a trcae_id of 22222.

Our current workaround is to use a custom task class which doesn't mutate the original task labels

class CustomTask(AsyncTaskiqDecoratedTask):

    def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
        """
        This function returns kicker object.

        Kicker is a object that can modify kiq request
        before sending it.

        :return: AsyncKicker instance.
        """
        return AsyncKicker(
            task_name=self.task_name,
            broker=self.broker,
            labels=deepcopy(self.labels),
        )


broker.decorator_class = CustomTask
@donc310 donc310 changed the title AsyncTaskiqDecoratedTask().kicker().with_labels() mutates task labels AsyncTaskiqDecoratedTask().kicker().with_labels() mutates task labels Mar 1, 2024
@s3rius
Copy link
Member

s3rius commented Mar 8, 2024

You're completely right. If you want to become a contributor, you can create a PR that fixes it. I haven't experienced this problem yet, but I think it's a possible bug.

Thanks for noticing. If you don't want to create a PR, I can create a patch myself.

@donc310
Copy link
Author

donc310 commented Mar 8, 2024

Will make PR for the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants