-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kiwipy/rmq related modules into independent module #297
base: dev
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## dev #297 +/- ##
======================================
Coverage ? 89.46%
======================================
Files ? 28
Lines ? 3196
Branches ? 0
======================================
Hits ? 2859
Misses ? 337
Partials ? 0 ☔ View full report in Codecov by Sentry. |
fc52fcd
to
c0a8bbd
Compare
4960836
to
47ac8e4
Compare
85fc72a
to
da644ac
Compare
class MockCoordinator: | ||
def __init__(self): | ||
self._task_subscribers = {} | ||
self._broadcast_subscribers = {} | ||
self._rpc_subscribers = {} | ||
self._closed = False | ||
|
||
def is_closed(self) -> bool: | ||
return self._closed | ||
|
||
def close(self): | ||
if self._closed: | ||
return | ||
self._closed = True | ||
del self._task_subscribers | ||
del self._broadcast_subscribers | ||
del self._rpc_subscribers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the Mocked in memory coordinator that in principle can work with sqlite in presto and make it able to submit the calculations.
f292cb6
to
42738fc
Compare
Hi @sphuber, would be great if you can have a look at this, at least make sure no big design issues here. It is not ready because when I port it to aiida-core, it seems work for python3.12 but for python3.9 I can not start the daemon (stuck at daemonize function of circus.). It surely requires more effort to get it proper implemented. |
Can try to find some time at some point but this is a big change, so would require quite some time. |
db8d1e2
to
2304d5f
Compare
…and asyncio.Future hand write wrap to kiwipy future (concurrent.futures.Future) kiwipy.Future -> concurrent.futures.Future
Move communication to rmq
4972dd2
to
6d3101d
Compare
plum_future.add_done_callback(on_done) | ||
return kiwi_future | ||
|
||
|
||
def convert_to_comm( | ||
callback: 'Subscriber', loop: Optional[asyncio.AbstractEventLoop] = None | ||
) -> Callable[..., kiwipy.Future]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the PR in aiida you converted plumpy.futures
to concurrent.futures
, my reasoning there was that if we don't have a motivation to change the default type (concurrent.futures.Future
) then we can just use it to simplify the code. Then we should also replace kiwipy.Future
to concurrent.futures.Future
.
My main concern was actually that we can make create a TypeAlias for the output type? -> ConcurrentSubscriber:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plumpy.futures to concurrent.futures, my reasoning there was that if we don't have a motivation to change the default type (concurrent.futures.Future) then we can just use it to simplify the code. Then we should also replace kiwipy.Future to concurrent.futures.Future.
The original goal was to clarify these two use cases. In plumpy, the plumpy.future
is actually alias of asyncio.Future
, while kiwipy.future
is for concurrent.futures.Future
which can not be await and used for thread communication.
I revert the kiwipy.Future back to be more conservative, but we should make it clear just use the correct future type which make our life easier in the future.
def wrap_communicator( | ||
communicator: kiwipy.Communicator, loop: Optional[asyncio.AbstractEventLoop] = None | ||
) -> 'LoopCommunicator': | ||
T = TypeVar('T', bound=kiwipy.Communicator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give more meaningful name?
T = TypeVar('T', bound=kiwipy.Communicator) | |
SubtypeCommunicator = TypeVar('SubtypeCommunicator', bound=kiwipy.Communicator) |
or
T = TypeVar('T', bound=kiwipy.Communicator) | |
CommunicatorT = TypeVar('CommunicatorT', bound=kiwipy.Communicator) |
or anything else that references that it comes from Communicator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better, I'll change.
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control.
By forming a
Coordinator
protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be thetatzelwurm
a task broker that has scheduler support and file based task broker require no background service).For the prototype of how a coordinator should look like, the
MockCoordinator
intests/utils
is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality.Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping
concurrent.futures.Future
intoasyncio.Future
. I use the same way to convertasyncio.Future
intoconcurent.futures.Future
(which is thekiwipy.Future
as alias).aio_pika
import lazily by moving the rmq exceptions tormq
module, this can increase the performance ofimport aiida; aiida.orm
.CancellableAction
using composite to behave as a Future like object.asyncio.Future
in favor of aliasplumpy.Future
andconcurrent.futures.Future
instead of aliaskiwipy.Future
._chain
and_copy_future
since we can not just rely on the API of asyncio that is not exposed.coordinator/Communicator
protocol.coordinator/Coordinator
protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy.unkcpz#6