Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ integration #333

Closed
wants to merge 7 commits into from
Closed

RQ integration #333

wants to merge 7 commits into from

Conversation

prepin
Copy link

@prepin prepin commented Jan 8, 2025

I've added integration for RQ.

Due to the way RQ is implemented, injecting dependencies with the @inject decorator is not possible, as RQ throws the task function out of the original context and completely ignores any decorators applied.

So, the way to go is to subclass the Worker and perform injection at the subclass level. The implementation is a little bit strange, as I am here manually rebuilding kwargs dictionary. But it works.

container = make_container(provider)
queues = ["default"]
conn = Redis()
worker = DishkaWorker(container=container, queues=queues, connection=conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the only way to setup integration? How like is it that user has their own custom worker class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but I'm not sure how severe the use of custom workers is. The official documentation suggests some cases such as running on Heroku or running a worker in the same thread as the main app (in case someone needs to run a Worker within a test suite).

Another option besides subclassing Worker is to patch the Worker class at runtime, as they did in the Sentry RQ integration. This allows for any custom Worker subclasses, but it's still not the cleanest way to plug in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reopen this discussion. For all integrations we have @inject decorator which actually does injection. This is required to have full control on when it is done (some users implement their own decorators but still have scope control in framework, or just can test it per-funcion). For some of them we also have "autoinjection"-mode, which uses less straightforward approach which allows to skip manually placing inject decorator, but is usually more dangerous or less flexible.

Can we implement same approach here? Answer no, I've checked is acceptable

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't see how to implement this without rewriting core RQ logic upstream. RQ is retrieving job function via getattr(module,'func_name'). That returns original undecorated function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for me:

from functools import wraps

def wrapped(something):
    def logged(func):
        @wraps(func)
        def with_logging(*args, **kwargs):
            print(func.__name__ + " was called", something)
            return func(*args, **kwargs)
        return with_logging
    return logged

rq log:

11:34:16 default: job.some_job() (fca8ef06-8d8e-4ee7-afe2-d52da9942ff1)
some_job was called 123

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with another idea on how to implement such integration without monkeypatching or subclassing original worker (at least for cases without autoinjection).

What if instead of attaching container to Worker instance — we wrap it inside @inject decorator itself?

E.g.

  1. Integrations provides injector factory
# dishka/integrations/rq.py

def make_inject(container: Container):
    def inject(func: Callable[P, T]) -> Callable[P, T]:
        with container() as request_container:
            wrapped = wrap_injection(
                func=func,
                remove_depends=True,
                container_getter=lambda _, __: request_container,
                is_async=False,
            )
            update_wrapper(wrapped, func)
            return wrapped

    return inject
  1. Instead of running setup_dishka(...) on startup, we are creating injector
# app/dependencies.py

from dishka.integrations.rq import Provider, Scope, make_container, make_inject

class StrProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def hello(self) -> str:
        return "Hello"
        
container = make_container(StrProvider())
inject = make_inject(container)
  1. In tasks file import newly created injector and decorate tasks as usual
# app/tasks.py

from dishka import FromDishka
from app.dependencies import inject

@inject
def download(name: FromDishka[str], redis: FromDishka[Redis]):
    print("Execute download job, with name=", name, " and redis=", redis)
    return f"{name} Downloaded"

Do you see any downsides of such implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. In this case we need global container to be created before tasks and indirectly imported to any of them. I can't say I like this approach more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow access rq app inside inject, so that we can get container from there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, there is no such thing as Celery's task.app.conf or other kind of context object. Task can only access it's own Job object which does not provide any references to global scope.

So third option is go Sentry's way and monkeypatch worker

def setup_dishka(worker: Worker, container: Container):
    worker_klass = worker.__class__
    old_perform_job = worker_klass.perform_job

    def dishka_patched_perform_job(self, job: Job, queue: Queue) -> bool:
        """Performs job call"""
        request_container = container().__enter__()
        setattr(job, "_dishka_request_container", request_container)

        job_result = old_perform_job(worker, job, queue)
        request_container.close()
        return job_result

    worker_klass.perform_job = dishka_patched_perform_job

In that case patched function will inject Request scope container into each Job instance from which it can be accessed by @inject decorator.

I guess in that case auto inject behavior can also be controlled by flag passed to setup_dishka(), like that:

def setup_dishka(worker: Worker, container: Container, auto_inject: bool = False):
  ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looks like we have not much options. I can agree with currently implemented one.

@prepin
Copy link
Author

prepin commented Jan 16, 2025

@Tishka17, is there anything else required on my side to merge that integration?

@Tishka17
Copy link
Member

Look's ok for me, I'll ask someone familiar with RQ to look at it too

@Tishka17
Copy link
Member

Tishka17 commented Jan 19, 2025

I want to thank you for your job. And so not to argue more with me about an approach for building integration I want to suggest maintaining it as a seperate project. We had an idea of community driven integrations: so we do not handle how they are built and only publish a link in a documentation. This gives more freedom for adding these integration and reduces our responsibility on maintaining them as a part of project.

What I suggest:

  • you prepare a separate github repository under your control.
  • you publish as a separate project on pypi (name like dishka-rq whould be ok)
  • we add a link to your project into dishka documentation

What do you think about it?

@prepin
Copy link
Author

prepin commented Jan 20, 2025

Sure. Let's archive current PR then.

Do you have any guidelines/requirements for such community integrations?

Questions I can think of now are:
- Do external package needs to keep same namespace structure as parent project (dishka.integrations.smthng)?
- Should it host own documentation on separate readthedocs page or it should be available in main repo?
- Which versions of Dishka considered "supported". Do you expect integration to be tested only on recent release or it should also check 1.3, 1.2, etc?

UPD, saw #335, moving my questions to related discussion.

@Tishka17 Tishka17 closed this Jan 29, 2025
@Tishka17 Tishka17 mentioned this pull request Jan 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants