-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RQ integration #333
RQ integration #333
Conversation
container = make_container(provider) | ||
queues = ["default"] | ||
conn = Redis() | ||
worker = DishkaWorker(container=container, queues=queues, connection=conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the only way to setup integration? How like is it that user has their own custom worker class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible, but I'm not sure how severe the use of custom workers is. The official documentation suggests some cases such as running on Heroku or running a worker in the same thread as the main app (in case someone needs to run a Worker within a test suite).
Another option besides subclassing Worker is to patch the Worker class at runtime, as they did in the Sentry RQ integration. This allows for any custom Worker subclasses, but it's still not the cleanest way to plug in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to reopen this discussion. For all integrations we have @inject
decorator which actually does injection. This is required to have full control on when it is done (some users implement their own decorators but still have scope control in framework, or just can test it per-funcion). For some of them we also have "autoinjection"-mode, which uses less straightforward approach which allows to skip manually placing inject decorator, but is usually more dangerous or less flexible.
Can we implement same approach here? Answer no, I've checked
is acceptable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I don't see how to implement this without rewriting core RQ logic upstream. RQ is retrieving job function via getattr(module,'func_name')
. That returns original undecorated function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works for me:
from functools import wraps
def wrapped(something):
def logged(func):
@wraps(func)
def with_logging(*args, **kwargs):
print(func.__name__ + " was called", something)
return func(*args, **kwargs)
return with_logging
return logged
rq log:
11:34:16 default: job.some_job() (fca8ef06-8d8e-4ee7-afe2-d52da9942ff1)
some_job was called 123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with another idea on how to implement such integration without monkeypatching or subclassing original worker (at least for cases without autoinjection).
What if instead of attaching container to Worker instance — we wrap it inside @inject
decorator itself?
E.g.
- Integrations provides injector factory
# dishka/integrations/rq.py
def make_inject(container: Container):
def inject(func: Callable[P, T]) -> Callable[P, T]:
with container() as request_container:
wrapped = wrap_injection(
func=func,
remove_depends=True,
container_getter=lambda _, __: request_container,
is_async=False,
)
update_wrapper(wrapped, func)
return wrapped
return inject
- Instead of running
setup_dishka(...)
on startup, we are creating injector
# app/dependencies.py
from dishka.integrations.rq import Provider, Scope, make_container, make_inject
class StrProvider(Provider):
@provide(scope=Scope.REQUEST)
def hello(self) -> str:
return "Hello"
container = make_container(StrProvider())
inject = make_inject(container)
- In tasks file import newly created injector and decorate tasks as usual
# app/tasks.py
from dishka import FromDishka
from app.dependencies import inject
@inject
def download(name: FromDishka[str], redis: FromDishka[Redis]):
print("Execute download job, with name=", name, " and redis=", redis)
return f"{name} Downloaded"
Do you see any downsides of such implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. In this case we need global container to be created before tasks and indirectly imported to any of them. I can't say I like this approach more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow access rq app inside inject, so that we can get container from there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, there is no such thing as Celery's task.app.conf
or other kind of context object. Task can only access it's own Job
object which does not provide any references to global scope.
So third option is go Sentry's way and monkeypatch worker
def setup_dishka(worker: Worker, container: Container):
worker_klass = worker.__class__
old_perform_job = worker_klass.perform_job
def dishka_patched_perform_job(self, job: Job, queue: Queue) -> bool:
"""Performs job call"""
request_container = container().__enter__()
setattr(job, "_dishka_request_container", request_container)
job_result = old_perform_job(worker, job, queue)
request_container.close()
return job_result
worker_klass.perform_job = dishka_patched_perform_job
In that case patched function will inject Request
scope container into each Job
instance from which it can be accessed by @inject
decorator.
I guess in that case auto inject behavior can also be controlled by flag passed to setup_dishka()
, like that:
def setup_dishka(worker: Worker, container: Container, auto_inject: bool = False):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, looks like we have not much options. I can agree with currently implemented one.
@Tishka17, is there anything else required on my side to merge that integration? |
Look's ok for me, I'll ask someone familiar with RQ to look at it too |
I want to thank you for your job. And so not to argue more with me about an approach for building integration I want to suggest maintaining it as a seperate project. We had an idea of community driven integrations: so we do not handle how they are built and only publish a link in a documentation. This gives more freedom for adding these integration and reduces our responsibility on maintaining them as a part of project. What I suggest:
What do you think about it? |
Sure. Let's archive current PR then. Do you have any guidelines/requirements for such community integrations?
UPD, saw #335, moving my questions to related discussion. |
I've added integration for RQ.
Due to the way RQ is implemented, injecting dependencies with the
@inject
decorator is not possible, as RQ throws the task function out of the original context and completely ignores any decorators applied.So, the way to go is to subclass the Worker and perform injection at the subclass level. The implementation is a little bit strange, as I am here manually rebuilding
kwargs
dictionary. But it works.