From 25eb3a0b3859c6c609986d54de7a02d7e24f7213 Mon Sep 17 00:00:00 2001 From: Manolis Papadakis Date: Wed, 27 Sep 2023 16:11:24 -0700 Subject: [PATCH 1/3] Detach fields in-order after consensus match --- legate/core/_legion/operation.py | 26 +++------ legate/core/_legion/util.py | 14 ----- legate/core/runtime.py | 94 +++++++++++--------------------- legate/core/store.py | 57 ++++++------------- 4 files changed, 57 insertions(+), 134 deletions(-) diff --git a/legate/core/_legion/operation.py b/legate/core/_legion/operation.py index 641b81343..94220df15 100644 --- a/legate/core/_legion/operation.py +++ b/legate/core/_legion/operation.py @@ -1239,7 +1239,6 @@ def launch( self, runtime: legion.legion_runtime_t, context: legion.legion_context_t, - unordered: bool = False, **kwargs: Any, ) -> Future: """ @@ -1248,27 +1247,20 @@ def launch( Returns ------- Future containing no data that completes when detach operation is done - If 'unordered' is set to true then you must call legate_task_progress - before using the returned Future """ # Check to see if we're still inside the context of the task # If not then we just need to leak this detach because it can't be done if context not in _pending_unordered: return Future() - if unordered: - future = Future() - _pending_unordered[context].append(((self, future), type(self))) - return future - else: - return Future( - legion.legion_unordered_detach_external_resource( - runtime, - context, - self.physical_region.handle, - self.flush, - True, - ) + return Future( + legion.legion_unordered_detach_external_resource( + runtime, + context, + self.physical_region.handle, + self.flush, + False, # unordered ) + ) class IndexAttach(Dispatchable[ExternalResources]): @@ -1429,7 +1421,7 @@ def launch( context, self.external_resources.handle, self.flush, - True, # unordered + False, # unordered ) ) diff --git a/legate/core/_legion/util.py b/legate/core/_legion/util.py index 240e32e56..c9cc0bcab 100644 --- a/legate/core/_legion/util.py +++ b/legate/core/_legion/util.py @@ -64,7 +64,6 @@ def legate_task_progress( """ from .future import Future, FutureMap - from .operation import Detach from .region import OutputRegion, PhysicalRegion, Region from .space import FieldSpace, IndexSpace from .task import ArgumentMap @@ -97,19 +96,6 @@ def legate_task_progress( ) elif type is PhysicalRegion: handle.unmap(runtime, context, unordered=False) - elif type is Detach: - detach = handle[0] - future = handle[1] - assert future.handle is None - future.handle = ( - legion.legion_unordered_detach_external_resource( - runtime, - context, - detach.physical_region.handle, - detach.flush, - True, - ) - ) else: raise TypeError( "Internal legate type error on unordered operations" diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 47f4d74cd..b7060df37 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -80,7 +80,7 @@ from .operation import AutoTask, Copy, ManualTask, Operation from .partition import PartitionBase from .projection import SymbolicPoint - from .store import Field, RegionField, Store + from .store import RegionField, Store ProjSpec = Tuple[int, SymbolicPoint] ShardSpec = Tuple[int, tuple[int, int], int] @@ -144,13 +144,15 @@ class FreeFieldInfo: manager: FieldManager region: Region field_id: int - detach_future: Union[Future, None] + attached_alloc: Union[None, Attachable] + detach: Union[Detach, IndexDetach, None] def free(self, ordered: bool = False) -> None: self.manager.free_field( self.region, self.field_id, - detach_future=self.detach_future, + self.attached_alloc, + self.detach, ordered=ordered, ) @@ -256,10 +258,11 @@ def add_free_field( manager: FieldManager, region: Region, field_id: int, - detach_future: Union[Future, None], + attached_alloc: Union[None, Attachable], + detach: Union[Detach, IndexDetach, None], ) -> None: self._freed_fields.append( - FreeFieldInfo(manager, region, field_id, detach_future) + FreeFieldInfo(manager, region, field_id, attached_alloc, detach) ) def issue_field_match(self, credit: int) -> None: @@ -409,10 +412,17 @@ def free_field( self, region: Region, field_id: int, - detach_future: Union[Future, None] = None, + attached_alloc: Union[None, Attachable], + detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: - self.free_fields.append((region, field_id, detach_future)) + detach_fut: Union[Future, None] = None + if attached_alloc is not None: + assert detach is not None + detach_fut = runtime.attachment_manager.detach_external_allocation( + attached_alloc, detach + ) + self.free_fields.append((region, field_id, detach_fut)) region_manager = self.runtime.find_region_manager(region) if region_manager.decrease_active_field_count(): self.runtime.free_region_manager( @@ -459,28 +469,23 @@ def try_reuse_field(self) -> Optional[tuple[Region, int]]: self._field_match_manager.update_free_fields() - # If any free fields were discovered on all shards, push their - # unordered detachments to the task stream now, so we can safely - # block on them later without fear of deadlock. - if len(self.free_fields) > 0: - self.runtime._progress_unordered_operations() - return _try_reuse_field(self.free_fields) def free_field( self, region: Region, field_id: int, - detach_future: Union[Future, None] = None, + attached_alloc: Union[None, Attachable], + detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: if ordered: super().free_field( - region, field_id, detach_future=detach_future, ordered=ordered + region, field_id, attached_alloc, detach, ordered ) else: # Put this on the unordered list self._field_match_manager.add_free_field( - self, region, field_id, detach_future + self, region, field_id, attached_alloc, detach ) @@ -518,27 +523,16 @@ def __init__(self, runtime: Runtime) -> None: self._registered_detachments: dict[ int, Union[Detach, IndexDetach] ] = dict() - self._deferred_detachments: List[ - tuple[Attachable, Union[Detach, IndexDetach], Union[Field, None]] - ] = list() self._pending_detachments: dict[Future, Attachable] = dict() self._destroyed = False def destroy(self) -> None: self._destroyed = True - gc.collect() - while self._deferred_detachments: - self.perform_detachments() - # Make sure progress is made on any of these operations - self._runtime._progress_unordered_operations() - gc.collect() # Always make sure we wait for any pending detachments to be done # so that we don't lose the references and make the GC unhappy - gc.collect() - while self._pending_detachments: - self.prune_detachments() - gc.collect() - + for future in self._pending_detachments.keys(): + future.wait() + self._pending_detachments.clear() # Clean up our attachments so that they can be collected self._attachments = dict() @@ -620,30 +614,15 @@ def _remove_allocation(self, alloc: Attachable) -> None: def detach_external_allocation( self, - alloc: Attachable, + attached_alloc: Attachable, detach: Union[Detach, IndexDetach], - defer: bool = False, - previously_deferred: bool = False, - dependent_field: Optional[Field] = None, ) -> Union[None, Future]: - # If the detachment was previously deferred, then we don't - # need to remove the allocation from the map again. - if not previously_deferred: - self._remove_allocation(alloc) - if defer: - assert dependent_field is not None - # If we need to defer this until later do that now - self._deferred_detachments.append((alloc, detach, dependent_field)) - return None + self._remove_allocation(attached_alloc) future = self._runtime.dispatch(detach) - # Dangle a reference to the field off the future to prevent the - # field from being recycled until the detach is done - field = detach.field # type: ignore[union-attr] - future.field_reference = field # type: ignore[attr-defined] # If the future is already ready, then no need to track it if future.is_ready(): return None - self._pending_detachments[future] = alloc + self._pending_detachments[future] = attached_alloc return future def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int: @@ -657,16 +636,6 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]: del self._registered_detachments[detach_key] return detach - def perform_detachments(self) -> None: - detachments = self._deferred_detachments - self._deferred_detachments = list() - for alloc, detach, field in detachments: - detach_future = self.detach_external_allocation( - alloc, detach, defer=False, previously_deferred=True - ) - if field is not None and detach_future is not None: - field.add_detach_future(detach_future) - def prune_detachments(self) -> None: to_remove = [] for future in self._pending_detachments.keys(): @@ -1370,12 +1339,10 @@ def get_next_storage_id(self) -> int: return self._next_storage_id def dispatch(self, op: Dispatchable[T]) -> T: - self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) def dispatch_single(self, op: Dispatchable[T]) -> T: - self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) @@ -1861,7 +1828,8 @@ def free_field( field_id: int, field_size: int, shape: Shape, - detach_future: Optional[Future] = None, + attached_alloc: Union[None, Attachable], + detach: Union[Detach, IndexDetach, None], ) -> None: # Have a guard here to make sure that we don't try to # do this after we have been destroyed @@ -1872,7 +1840,9 @@ def free_field( if key not in self.field_managers: return - self.field_managers[key].free_field(region, field_id, detach_future) + self.field_managers[key].free_field( + region, field_id, attached_alloc, detach + ) def import_output_region( self, out_region: OutputRegion, field_id: int, dtype: Any diff --git a/legate/core/store.py b/legate/core/store.py index 4d529e858..fdd912c29 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -80,25 +80,31 @@ def __init__( self.field_id = field_id self.field_size = field_size self.shape = shape - self.detach_future: Optional[Future] = None + # External allocation we attached to this field + self.attached_alloc: Union[None, Attachable] = None + self.detach_key: int = -1 def same_handle(self, other: Field) -> bool: return type(self) == type(other) and self.field_id == other.field_id - def add_detach_future(self, future: Future) -> None: - self.detach_future = future - def __str__(self) -> str: return f"Field({self.field_id})" def __del__(self) -> None: + # Detach any existing allocation + detach = ( + None + if self.attached_alloc is None + else attachment_manager.remove_detachment(self.detach_key) + ) # Return our field back to the runtime runtime.free_field( self.region, self.field_id, self.field_size, self.shape, - self.detach_future, + self.attached_alloc, + detach, ) @@ -117,9 +123,6 @@ def __init__( self.field = field self.shape = shape self.parent = parent - # External allocation we attached to this field - self.attached_alloc: Union[None, Attachable] = None - self.detach_key: int = -1 # Physical region for attach self.physical_region: Union[None, PhysicalRegion] = None self.physical_region_refs = 0 @@ -127,10 +130,6 @@ def __init__( self._partitions: dict[Tiling, LegionPartition] = {} - def __del__(self) -> None: - if self.attached_alloc is not None: - self.detach_external_allocation(unordered=True, defer=True) - @staticmethod def create( region: Region, field_id: int, field_size: int, shape: Shape @@ -157,12 +156,7 @@ def attach_external_allocation( ) -> None: assert self.parent is None # If we already have some memory attached, detach it first - if self.attached_alloc is not None: - raise RuntimeError("A RegionField cannot be re-attached") - if ( - self.field.detach_future is not None - and not self.field.detach_future.is_ready() - ): + if self.field.attached_alloc is not None: raise RuntimeError("A RegionField cannot be re-attached") # All inline mappings should have been unmapped by now assert self.physical_region_refs == 0 @@ -171,14 +165,12 @@ def attach_external_allocation( attachment_manager.attach_external_allocation(alloc, self) def record_detach(detach: Union[Detach, IndexDetach]) -> None: - # Dangle these fields off the detachment operation, to prevent - # premature collection - detach.field = self.field # type: ignore[union-attr] - detach.alloc = alloc # type: ignore[union-attr] # Don't store the detachment operation here, instead register it # on the attachment manager and record its unique key # TODO: This might not be necessary anymore - self.detach_key = attachment_manager.register_detachment(detach) + self.field.detach_key = attachment_manager.register_detachment( + detach + ) if isinstance(alloc, memoryview): # Singleton attachment @@ -247,24 +239,7 @@ def record_detach(detach: Union[Detach, IndexDetach]) -> None: # if this is an internal temporary allocation. record_detach(IndexDetach(external_resources, flush=share)) # Record the attachment - self.attached_alloc = alloc - - def detach_external_allocation( - self, unordered: bool, defer: bool = False - ) -> None: - assert self.parent is None - assert self.attached_alloc is not None - detach = attachment_manager.remove_detachment(self.detach_key) - detach.unordered = unordered # type: ignore[union-attr] - detach_future = attachment_manager.detach_external_allocation( - self.attached_alloc, detach, defer, dependent_field=self.field - ) - self.physical_region = None - self.physical_region_mapped = False - self.physical_region_refs = 0 - self.attached_alloc = None - if detach_future is not None: - self.field.add_detach_future(detach_future) + self.field.attached_alloc = alloc def get_inline_mapped_region(self) -> PhysicalRegion: if self.parent is None: From 046a82ba230f4afd6bac83ad93b487aac9e00390 Mon Sep 17 00:00:00 2001 From: Manolis Papadakis Date: Tue, 17 Oct 2023 15:33:48 -0700 Subject: [PATCH 2/3] Must defer unordered detaches even in single-node mode Otherwise we end up doing runtime work in a detructor, which causes CFII to deadlock. --- legate/core/_legion/operation.py | 8 +++ legate/core/runtime.py | 109 +++++++++++++++++++------------ legate/core/store.py | 12 ++-- 3 files changed, 79 insertions(+), 50 deletions(-) diff --git a/legate/core/_legion/operation.py b/legate/core/_legion/operation.py index 94220df15..3f6a282d8 100644 --- a/legate/core/_legion/operation.py +++ b/legate/core/_legion/operation.py @@ -1233,6 +1233,10 @@ def __init__(self, region: PhysicalRegion, flush: bool = True) -> None: # it is not deleted before this detach operation can run self.region = region.region self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None @dispatch def launch( @@ -1401,6 +1405,10 @@ def __init__( """ self.external_resources = external_resources self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None def launch( self, diff --git a/legate/core/runtime.py b/legate/core/runtime.py index b7060df37..b4c102311 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -144,14 +144,12 @@ class FreeFieldInfo: manager: FieldManager region: Region field_id: int - attached_alloc: Union[None, Attachable] detach: Union[Detach, IndexDetach, None] - def free(self, ordered: bool = False) -> None: + def free(self, ordered: bool) -> None: self.manager.free_field( self.region, self.field_id, - self.attached_alloc, self.detach, ordered=ordered, ) @@ -258,11 +256,10 @@ def add_free_field( manager: FieldManager, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ) -> None: self._freed_fields.append( - FreeFieldInfo(manager, region, field_id, attached_alloc, detach) + FreeFieldInfo(manager, region, field_id, detach) ) def issue_field_match(self, credit: int) -> None: @@ -362,14 +359,24 @@ def allocate_field(self, field_size: Any) -> tuple[Region, int, bool]: def _try_reuse_field( - free_fields: Deque[tuple[Region, int, Union[Future, None]]] + free_fields: Deque[tuple[Region, int, Union[Detach, IndexDetach, None]]] ) -> Optional[tuple[Region, int]]: if len(free_fields) == 0: return None - field_info = free_fields.popleft() - if field_info[2] is not None and not field_info[2].is_ready(): - field_info[2].wait() - return field_info[0], field_info[1] + region, field_id, detach = free_fields.popleft() + + if detach is not None: + if detach.future is None: + # corner case; the detach has been added to _deferred_detachments + # but not dispatched yet, so do that now + runtime.attachment_manager.perform_detachments() + assert detach.future is not None + # We have to wait for the field to be detached from any attachment + # before we can reuse it. + detach.future.wait() + # The Detach operation will asynchronously be removed from + # _pending_detachments through the _prune_detachment mechanism + return region, field_id # This class manages the allocation and reuse of fields @@ -385,7 +392,7 @@ def __init__( # guaranteed to be ordered across all the shards even with # control replication self.free_fields: Deque[ - tuple[Region, int, Union[Future, None]] + tuple[Region, int, Union[Detach, IndexDetach, None]] ] = deque() def destroy(self) -> None: @@ -412,17 +419,14 @@ def free_field( self, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: - detach_fut: Union[Future, None] = None - if attached_alloc is not None: - assert detach is not None - detach_fut = runtime.attachment_manager.detach_external_allocation( - attached_alloc, detach + if detach is not None: + runtime.attachment_manager.detach_external_allocation( + detach, ordered ) - self.free_fields.append((region, field_id, detach_fut)) + self.free_fields.append((region, field_id, detach)) region_manager = self.runtime.find_region_manager(region) if region_manager.decrease_active_field_count(): self.runtime.free_region_manager( @@ -475,17 +479,14 @@ def free_field( self, region: Region, field_id: int, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: if ordered: - super().free_field( - region, field_id, attached_alloc, detach, ordered - ) + super().free_field(region, field_id, detach, ordered) else: # Put this on the unordered list self._field_match_manager.add_free_field( - self, region, field_id, attached_alloc, detach + self, region, field_id, detach ) @@ -523,15 +524,19 @@ def __init__(self, runtime: Runtime) -> None: self._registered_detachments: dict[ int, Union[Detach, IndexDetach] ] = dict() - self._pending_detachments: dict[Future, Attachable] = dict() + self._deferred_detachments: List[Union[Detach, IndexDetach]] = [] + self._pending_detachments: List[Union[Detach, IndexDetach]] = [] self._destroyed = False def destroy(self) -> None: self._destroyed = True + # Schedule any queued detachments + self.perform_detachments() # Always make sure we wait for any pending detachments to be done # so that we don't lose the references and make the GC unhappy - for future in self._pending_detachments.keys(): - future.wait() + for detach in self._pending_detachments: + assert detach.future is not None + detach.future.wait() self._pending_detachments.clear() # Clean up our attachments so that they can be collected self._attachments = dict() @@ -614,16 +619,24 @@ def _remove_allocation(self, alloc: Attachable) -> None: def detach_external_allocation( self, - attached_alloc: Attachable, detach: Union[Detach, IndexDetach], - ) -> Union[None, Future]: - self._remove_allocation(attached_alloc) + ordered: bool, + previously_deferred: bool = False, + ) -> None: + assert detach.attached_alloc is not None + assert detach.future is None + if not previously_deferred: + self._remove_allocation(detach.attached_alloc) + if not ordered: + # If we need to defer this until later do that now + self._deferred_detachments.append(detach) + return future = self._runtime.dispatch(detach) + # Hang the future on the detach operation itself + detach.future = future # If the future is already ready, then no need to track it - if future.is_ready(): - return None - self._pending_detachments[future] = attached_alloc - return future + if not future.is_ready(): + self._pending_detachments.append(detach) def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int: key = self._next_detachment_key @@ -636,13 +649,24 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]: del self._registered_detachments[detach_key] return detach + def perform_detachments(self) -> None: + # We have to clear the list first, otherwise the dispatch() of the + # detach brings us back here, and results in an infinite loop. + detachments = self._deferred_detachments + self._deferred_detachments = list() + for detach in detachments: + if detach.future is None: + self.detach_external_allocation( + detach, ordered=True, previously_deferred=True + ) + def prune_detachments(self) -> None: - to_remove = [] - for future in self._pending_detachments.keys(): - if future.is_ready(): - to_remove.append(future) - for future in to_remove: - del self._pending_detachments[future] + new_pending: List[Union[Detach, IndexDetach]] = [] + for detach in self._pending_detachments: + assert detach.future is not None + if not detach.future.is_ready(): + new_pending.append(detach) + self._pending_detachments = new_pending class PartitionManager: @@ -1339,10 +1363,12 @@ def get_next_storage_id(self) -> int: return self._next_storage_id def dispatch(self, op: Dispatchable[T]) -> T: + self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) def dispatch_single(self, op: Dispatchable[T]) -> T: + self._attachment_manager.perform_detachments() self._attachment_manager.prune_detachments() return op.launch(self.legion_runtime, self.legion_context) @@ -1828,7 +1854,6 @@ def free_field( field_id: int, field_size: int, shape: Shape, - attached_alloc: Union[None, Attachable], detach: Union[Detach, IndexDetach, None], ) -> None: # Have a guard here to make sure that we don't try to @@ -1840,9 +1865,7 @@ def free_field( if key not in self.field_managers: return - self.field_managers[key].free_field( - region, field_id, attached_alloc, detach - ) + self.field_managers[key].free_field(region, field_id, detach) def import_output_region( self, out_region: OutputRegion, field_id: int, dtype: Any diff --git a/legate/core/store.py b/legate/core/store.py index e901737fe..10279256c 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -80,8 +80,6 @@ def __init__( self.field_id = field_id self.field_size = field_size self.shape = shape - # External allocation we attached to this field - self.attached_alloc: Union[None, Attachable] = None self.detach_key: int = -1 def same_handle(self, other: Field) -> bool: @@ -96,7 +94,7 @@ def __del__(self) -> None: # Detach any existing allocation detach = ( None - if self.attached_alloc is None + if self.detach_key < 0 else attachment_manager.remove_detachment(self.detach_key) ) # Return our field back to the runtime @@ -105,7 +103,6 @@ def __del__(self) -> None: self.field_id, self.field_size, self.shape, - self.attached_alloc, detach, ) @@ -158,7 +155,7 @@ def attach_external_allocation( ) -> None: assert self.parent is None # If we already have some memory attached, detach it first - if self.field.attached_alloc is not None: + if self.field.detach_key >= 0: raise RuntimeError("A RegionField cannot be re-attached") # All inline mappings should have been unmapped by now assert self.physical_region_refs == 0 @@ -167,6 +164,9 @@ def attach_external_allocation( attachment_manager.attach_external_allocation(alloc, self) def record_detach(detach: Union[Detach, IndexDetach]) -> None: + # Hang the allocation on the detach operation, so it won't be + # deallocated until the detach is processed. + detach.attached_alloc = alloc # Don't store the detachment operation here, instead register it # on the attachment manager and record its unique key # TODO: This might not be necessary anymore @@ -240,8 +240,6 @@ def record_detach(detach: Union[Detach, IndexDetach]) -> None: # We don't need to flush the contents back to the attached memory # if this is an internal temporary allocation. record_detach(IndexDetach(external_resources, flush=share)) - # Record the attachment - self.field.attached_alloc = alloc def get_inline_mapped_region(self) -> PhysicalRegion: if self.parent is None: From d1683ad59250e7394c40d5be1daabfa2b9d6b57a Mon Sep 17 00:00:00 2001 From: Manolis Papadakis Date: Wed, 18 Oct 2023 12:46:12 -0700 Subject: [PATCH 3/3] Move AttachmentManager destruction earlier, to avoid use-after-frees --- legate/core/runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/legate/core/runtime.py b/legate/core/runtime.py index b4c102311..4f074a3eb 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -1324,6 +1324,8 @@ def destroy(self) -> None: self.legion_runtime, self.legion_context, barrier ) + self._attachment_manager.destroy() + # Destroy all libraries. Note that we should do this # from the lastly added one to the first one for context in reversed(self._context_list): @@ -1331,8 +1333,6 @@ def destroy(self) -> None: del self._contexts del self._context_list - self._attachment_manager.destroy() - # Remove references to our legion resources so they can be collected self.active_region_managers = {} self.region_managers_by_region = {}