Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detach fields in-order after consensus match #843

Open
wants to merge 4 commits into
base: branch-24.03
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions legate/core/_legion/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,13 +1233,16 @@ def __init__(self, region: PhysicalRegion, flush: bool = True) -> None:
# it is not deleted before this detach operation can run
self.region = region.region
self.flush = flush
# The following fields aren't part of the operation itself, but are
# used for managing the lifetime of attached objects
self.attached_alloc: Any = None
self.future: Optional[Future] = None

@dispatch
def launch(
self,
runtime: legion.legion_runtime_t,
context: legion.legion_context_t,
unordered: bool = False,
**kwargs: Any,
) -> Future:
"""
Expand All @@ -1248,27 +1251,20 @@ def launch(
Returns
-------
Future containing no data that completes when detach operation is done
If 'unordered' is set to true then you must call legate_task_progress
before using the returned Future
"""
# Check to see if we're still inside the context of the task
# If not then we just need to leak this detach because it can't be done
if context not in _pending_unordered:
return Future()
if unordered:
future = Future()
_pending_unordered[context].append(((self, future), type(self)))
return future
else:
return Future(
legion.legion_unordered_detach_external_resource(
runtime,
context,
self.physical_region.handle,
self.flush,
True,
)
return Future(
legion.legion_unordered_detach_external_resource(
runtime,
context,
self.physical_region.handle,
self.flush,
False, # unordered
)
)


class IndexAttach(Dispatchable[ExternalResources]):
Expand Down Expand Up @@ -1409,6 +1405,10 @@ def __init__(
"""
self.external_resources = external_resources
self.flush = flush
# The following fields aren't part of the operation itself, but are
# used for managing the lifetime of attached objects
self.attached_alloc: Any = None
self.future: Optional[Future] = None

def launch(
self,
Expand All @@ -1429,7 +1429,7 @@ def launch(
context,
self.external_resources.handle,
self.flush,
True, # unordered
False, # unordered
)
)

Expand Down
14 changes: 0 additions & 14 deletions legate/core/_legion/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def legate_task_progress(
"""

from .future import Future, FutureMap
from .operation import Detach
from .region import OutputRegion, PhysicalRegion, Region
from .space import FieldSpace, IndexSpace
from .task import ArgumentMap
Expand Down Expand Up @@ -97,19 +96,6 @@ def legate_task_progress(
)
elif type is PhysicalRegion:
handle.unmap(runtime, context, unordered=False)
elif type is Detach:
detach = handle[0]
future = handle[1]
assert future.handle is None
future.handle = (
legion.legion_unordered_detach_external_resource(
runtime,
context,
detach.physical_region.handle,
detach.flush,
True,
)
)
else:
raise TypeError(
"Internal legate type error on unordered operations"
Expand Down
133 changes: 65 additions & 68 deletions legate/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
from .operation import AutoTask, Copy, ManualTask, Operation
from .partition import PartitionBase
from .projection import SymbolicPoint
from .store import Field, RegionField, Store
from .store import RegionField, Store

ProjSpec = Tuple[int, SymbolicPoint]
ShardSpec = Tuple[int, tuple[int, int], int]
Expand Down Expand Up @@ -144,13 +144,13 @@ class FreeFieldInfo:
manager: FieldManager
region: Region
field_id: int
detach_future: Union[Future, None]
detach: Union[Detach, IndexDetach, None]

def free(self, ordered: bool = False) -> None:
def free(self, ordered: bool) -> None:
self.manager.free_field(
self.region,
self.field_id,
detach_future=self.detach_future,
self.detach,
ordered=ordered,
)

Expand Down Expand Up @@ -256,10 +256,10 @@ def add_free_field(
manager: FieldManager,
region: Region,
field_id: int,
detach_future: Union[Future, None],
detach: Union[Detach, IndexDetach, None],
) -> None:
self._freed_fields.append(
FreeFieldInfo(manager, region, field_id, detach_future)
FreeFieldInfo(manager, region, field_id, detach)
)

def issue_field_match(self, credit: int) -> None:
Expand Down Expand Up @@ -359,14 +359,24 @@ def allocate_field(self, field_size: Any) -> tuple[Region, int, bool]:


def _try_reuse_field(
free_fields: Deque[tuple[Region, int, Union[Future, None]]]
free_fields: Deque[tuple[Region, int, Union[Detach, IndexDetach, None]]]
) -> Optional[tuple[Region, int]]:
if len(free_fields) == 0:
return None
field_info = free_fields.popleft()
if field_info[2] is not None and not field_info[2].is_ready():
field_info[2].wait()
return field_info[0], field_info[1]
region, field_id, detach = free_fields.popleft()

if detach is not None:
if detach.future is None:
# corner case; the detach has been added to _deferred_detachments
# but not dispatched yet, so do that now
runtime.attachment_manager.perform_detachments()
assert detach.future is not None
# We have to wait for the field to be detached from any attachment
# before we can reuse it.
detach.future.wait()
# The Detach operation will asynchronously be removed from
# _pending_detachments through the _prune_detachment mechanism
return region, field_id


# This class manages the allocation and reuse of fields
Expand All @@ -382,7 +392,7 @@ def __init__(
# guaranteed to be ordered across all the shards even with
# control replication
self.free_fields: Deque[
tuple[Region, int, Union[Future, None]]
tuple[Region, int, Union[Detach, IndexDetach, None]]
] = deque()

def destroy(self) -> None:
Expand All @@ -409,10 +419,14 @@ def free_field(
self,
region: Region,
field_id: int,
detach_future: Union[Future, None] = None,
detach: Union[Detach, IndexDetach, None],
ordered: bool = False,
) -> None:
self.free_fields.append((region, field_id, detach_future))
if detach is not None:
runtime.attachment_manager.detach_external_allocation(
detach, ordered
)
self.free_fields.append((region, field_id, detach))
region_manager = self.runtime.find_region_manager(region)
if region_manager.decrease_active_field_count():
self.runtime.free_region_manager(
Expand Down Expand Up @@ -459,28 +473,20 @@ def try_reuse_field(self) -> Optional[tuple[Region, int]]:

self._field_match_manager.update_free_fields()

# If any free fields were discovered on all shards, push their
# unordered detachments to the task stream now, so we can safely
# block on them later without fear of deadlock.
if len(self.free_fields) > 0:
self.runtime._progress_unordered_operations()

return _try_reuse_field(self.free_fields)

def free_field(
self,
region: Region,
field_id: int,
detach_future: Union[Future, None] = None,
detach: Union[Detach, IndexDetach, None],
ordered: bool = False,
) -> None:
if ordered:
super().free_field(
region, field_id, detach_future=detach_future, ordered=ordered
)
super().free_field(region, field_id, detach, ordered)
else: # Put this on the unordered list
self._field_match_manager.add_free_field(
self, region, field_id, detach_future
self, region, field_id, detach
)


Expand Down Expand Up @@ -518,22 +524,19 @@ def __init__(self, runtime: Runtime) -> None:
self._registered_detachments: dict[
int, Union[Detach, IndexDetach]
] = dict()
self._deferred_detachments: List[
tuple[Attachable, Union[Detach, IndexDetach], Union[Field, None]]
] = list()
self._pending_detachments: dict[Future, Attachable] = dict()
self._deferred_detachments: List[Union[Detach, IndexDetach]] = []
self._pending_detachments: List[Union[Detach, IndexDetach]] = []
self._destroyed = False

def destroy(self) -> None:
self._destroyed = True
# Schedule any queued detachments
self.perform_detachments()
# Make sure progress is made on any of these operations
self._runtime._progress_unordered_operations()
# Always make sure we wait for any pending detachments to be done
# so that we don't lose the references and make the GC unhappy
for future in self._pending_detachments.keys():
future.wait()
for detach in self._pending_detachments:
assert detach.future is not None
detach.future.wait()
self._pending_detachments.clear()
# Clean up our attachments so that they can be collected
self._attachments = dict()
Expand Down Expand Up @@ -616,31 +619,24 @@ def _remove_allocation(self, alloc: Attachable) -> None:

def detach_external_allocation(
self,
alloc: Attachable,
detach: Union[Detach, IndexDetach],
defer: bool = False,
ordered: bool,
previously_deferred: bool = False,
dependent_field: Optional[Field] = None,
) -> Union[None, Future]:
# If the detachment was previously deferred, then we don't
# need to remove the allocation from the map again.
) -> None:
assert detach.attached_alloc is not None
assert detach.future is None
if not previously_deferred:
self._remove_allocation(alloc)
if defer:
assert dependent_field is not None
self._remove_allocation(detach.attached_alloc)
if not ordered:
# If we need to defer this until later do that now
self._deferred_detachments.append((alloc, detach, dependent_field))
return None
self._deferred_detachments.append(detach)
return
future = self._runtime.dispatch(detach)
# Dangle a reference to the field off the future to prevent the
# field from being recycled until the detach is done
field = detach.field # type: ignore[union-attr]
future.field_reference = field # type: ignore[attr-defined]
# Hang the future on the detach operation itself
detach.future = future
# If the future is already ready, then no need to track it
if future.is_ready():
return None
self._pending_detachments[future] = alloc
return future
if not future.is_ready():
self._pending_detachments.append(detach)

def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int:
key = self._next_detachment_key
Expand All @@ -654,22 +650,23 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]:
return detach

def perform_detachments(self) -> None:
# We have to clear the list first, otherwise the dispatch() of the
# detach brings us back here, and results in an infinite loop.
detachments = self._deferred_detachments
self._deferred_detachments = list()
for alloc, detach, field in detachments:
detach_future = self.detach_external_allocation(
alloc, detach, defer=False, previously_deferred=True
)
if field is not None and detach_future is not None:
field.add_detach_future(detach_future)
for detach in detachments:
if detach.future is None:
self.detach_external_allocation(
detach, ordered=True, previously_deferred=True
)

def prune_detachments(self) -> None:
to_remove = []
for future in self._pending_detachments.keys():
if future.is_ready():
to_remove.append(future)
for future in to_remove:
del self._pending_detachments[future]
new_pending: List[Union[Detach, IndexDetach]] = []
for detach in self._pending_detachments:
assert detach.future is not None
if not detach.future.is_ready():
new_pending.append(detach)
self._pending_detachments = new_pending


class PartitionManager:
Expand Down Expand Up @@ -1327,15 +1324,15 @@ def destroy(self) -> None:
self.legion_runtime, self.legion_context, barrier
)

self._attachment_manager.destroy()

# Destroy all libraries. Note that we should do this
# from the lastly added one to the first one
for context in reversed(self._context_list):
context.destroy()
del self._contexts
del self._context_list

self._attachment_manager.destroy()

# Remove references to our legion resources so they can be collected
self.active_region_managers = {}
self.region_managers_by_region = {}
Expand Down Expand Up @@ -1857,7 +1854,7 @@ def free_field(
field_id: int,
field_size: int,
shape: Shape,
detach_future: Optional[Future] = None,
detach: Union[Detach, IndexDetach, None],
) -> None:
# Have a guard here to make sure that we don't try to
# do this after we have been destroyed
Expand All @@ -1868,7 +1865,7 @@ def free_field(
if key not in self.field_managers:
return

self.field_managers[key].free_field(region, field_id, detach_future)
self.field_managers[key].free_field(region, field_id, detach)

def import_output_region(
self, out_region: OutputRegion, field_id: int, dtype: Any
Expand Down
Loading