diff --git a/legate/core/_legion/operation.py b/legate/core/_legion/operation.py index 641b81343..3f6a282d8 100644 --- a/legate/core/_legion/operation.py +++ b/legate/core/_legion/operation.py @@ -1233,13 +1233,16 @@ def __init__(self, region: PhysicalRegion, flush: bool = True) -> None: # it is not deleted before this detach operation can run self.region = region.region self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None @dispatch def launch( self, runtime: legion.legion_runtime_t, context: legion.legion_context_t, - unordered: bool = False, **kwargs: Any, ) -> Future: """ @@ -1248,27 +1251,20 @@ def launch( Returns ------- Future containing no data that completes when detach operation is done - If 'unordered' is set to true then you must call legate_task_progress - before using the returned Future """ # Check to see if we're still inside the context of the task # If not then we just need to leak this detach because it can't be done if context not in _pending_unordered: return Future() - if unordered: - future = Future() - _pending_unordered[context].append(((self, future), type(self))) - return future - else: - return Future( - legion.legion_unordered_detach_external_resource( - runtime, - context, - self.physical_region.handle, - self.flush, - True, - ) + return Future( + legion.legion_unordered_detach_external_resource( + runtime, + context, + self.physical_region.handle, + self.flush, + False, # unordered ) + ) class IndexAttach(Dispatchable[ExternalResources]): @@ -1409,6 +1405,10 @@ def __init__( """ self.external_resources = external_resources self.flush = flush + # The following fields aren't part of the operation itself, but are + # used for managing the lifetime of attached objects + self.attached_alloc: Any = None + self.future: Optional[Future] = None def launch( self, @@ -1429,7 +1429,7 @@ def launch( context, self.external_resources.handle, self.flush, - True, # unordered + False, # unordered ) ) diff --git a/legate/core/_legion/util.py b/legate/core/_legion/util.py index 240e32e56..c9cc0bcab 100644 --- a/legate/core/_legion/util.py +++ b/legate/core/_legion/util.py @@ -64,7 +64,6 @@ def legate_task_progress( """ from .future import Future, FutureMap - from .operation import Detach from .region import OutputRegion, PhysicalRegion, Region from .space import FieldSpace, IndexSpace from .task import ArgumentMap @@ -97,19 +96,6 @@ def legate_task_progress( ) elif type is PhysicalRegion: handle.unmap(runtime, context, unordered=False) - elif type is Detach: - detach = handle[0] - future = handle[1] - assert future.handle is None - future.handle = ( - legion.legion_unordered_detach_external_resource( - runtime, - context, - detach.physical_region.handle, - detach.flush, - True, - ) - ) else: raise TypeError( "Internal legate type error on unordered operations" diff --git a/legate/core/runtime.py b/legate/core/runtime.py index 0aca17e5a..4f074a3eb 100644 --- a/legate/core/runtime.py +++ b/legate/core/runtime.py @@ -80,7 +80,7 @@ from .operation import AutoTask, Copy, ManualTask, Operation from .partition import PartitionBase from .projection import SymbolicPoint - from .store import Field, RegionField, Store + from .store import RegionField, Store ProjSpec = Tuple[int, SymbolicPoint] ShardSpec = Tuple[int, tuple[int, int], int] @@ -144,13 +144,13 @@ class FreeFieldInfo: manager: FieldManager region: Region field_id: int - detach_future: Union[Future, None] + detach: Union[Detach, IndexDetach, None] - def free(self, ordered: bool = False) -> None: + def free(self, ordered: bool) -> None: self.manager.free_field( self.region, self.field_id, - detach_future=self.detach_future, + self.detach, ordered=ordered, ) @@ -256,10 +256,10 @@ def add_free_field( manager: FieldManager, region: Region, field_id: int, - detach_future: Union[Future, None], + detach: Union[Detach, IndexDetach, None], ) -> None: self._freed_fields.append( - FreeFieldInfo(manager, region, field_id, detach_future) + FreeFieldInfo(manager, region, field_id, detach) ) def issue_field_match(self, credit: int) -> None: @@ -359,14 +359,24 @@ def allocate_field(self, field_size: Any) -> tuple[Region, int, bool]: def _try_reuse_field( - free_fields: Deque[tuple[Region, int, Union[Future, None]]] + free_fields: Deque[tuple[Region, int, Union[Detach, IndexDetach, None]]] ) -> Optional[tuple[Region, int]]: if len(free_fields) == 0: return None - field_info = free_fields.popleft() - if field_info[2] is not None and not field_info[2].is_ready(): - field_info[2].wait() - return field_info[0], field_info[1] + region, field_id, detach = free_fields.popleft() + + if detach is not None: + if detach.future is None: + # corner case; the detach has been added to _deferred_detachments + # but not dispatched yet, so do that now + runtime.attachment_manager.perform_detachments() + assert detach.future is not None + # We have to wait for the field to be detached from any attachment + # before we can reuse it. + detach.future.wait() + # The Detach operation will asynchronously be removed from + # _pending_detachments through the _prune_detachment mechanism + return region, field_id # This class manages the allocation and reuse of fields @@ -382,7 +392,7 @@ def __init__( # guaranteed to be ordered across all the shards even with # control replication self.free_fields: Deque[ - tuple[Region, int, Union[Future, None]] + tuple[Region, int, Union[Detach, IndexDetach, None]] ] = deque() def destroy(self) -> None: @@ -409,10 +419,14 @@ def free_field( self, region: Region, field_id: int, - detach_future: Union[Future, None] = None, + detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: - self.free_fields.append((region, field_id, detach_future)) + if detach is not None: + runtime.attachment_manager.detach_external_allocation( + detach, ordered + ) + self.free_fields.append((region, field_id, detach)) region_manager = self.runtime.find_region_manager(region) if region_manager.decrease_active_field_count(): self.runtime.free_region_manager( @@ -459,28 +473,20 @@ def try_reuse_field(self) -> Optional[tuple[Region, int]]: self._field_match_manager.update_free_fields() - # If any free fields were discovered on all shards, push their - # unordered detachments to the task stream now, so we can safely - # block on them later without fear of deadlock. - if len(self.free_fields) > 0: - self.runtime._progress_unordered_operations() - return _try_reuse_field(self.free_fields) def free_field( self, region: Region, field_id: int, - detach_future: Union[Future, None] = None, + detach: Union[Detach, IndexDetach, None], ordered: bool = False, ) -> None: if ordered: - super().free_field( - region, field_id, detach_future=detach_future, ordered=ordered - ) + super().free_field(region, field_id, detach, ordered) else: # Put this on the unordered list self._field_match_manager.add_free_field( - self, region, field_id, detach_future + self, region, field_id, detach ) @@ -518,22 +524,19 @@ def __init__(self, runtime: Runtime) -> None: self._registered_detachments: dict[ int, Union[Detach, IndexDetach] ] = dict() - self._deferred_detachments: List[ - tuple[Attachable, Union[Detach, IndexDetach], Union[Field, None]] - ] = list() - self._pending_detachments: dict[Future, Attachable] = dict() + self._deferred_detachments: List[Union[Detach, IndexDetach]] = [] + self._pending_detachments: List[Union[Detach, IndexDetach]] = [] self._destroyed = False def destroy(self) -> None: self._destroyed = True # Schedule any queued detachments self.perform_detachments() - # Make sure progress is made on any of these operations - self._runtime._progress_unordered_operations() # Always make sure we wait for any pending detachments to be done # so that we don't lose the references and make the GC unhappy - for future in self._pending_detachments.keys(): - future.wait() + for detach in self._pending_detachments: + assert detach.future is not None + detach.future.wait() self._pending_detachments.clear() # Clean up our attachments so that they can be collected self._attachments = dict() @@ -616,31 +619,24 @@ def _remove_allocation(self, alloc: Attachable) -> None: def detach_external_allocation( self, - alloc: Attachable, detach: Union[Detach, IndexDetach], - defer: bool = False, + ordered: bool, previously_deferred: bool = False, - dependent_field: Optional[Field] = None, - ) -> Union[None, Future]: - # If the detachment was previously deferred, then we don't - # need to remove the allocation from the map again. + ) -> None: + assert detach.attached_alloc is not None + assert detach.future is None if not previously_deferred: - self._remove_allocation(alloc) - if defer: - assert dependent_field is not None + self._remove_allocation(detach.attached_alloc) + if not ordered: # If we need to defer this until later do that now - self._deferred_detachments.append((alloc, detach, dependent_field)) - return None + self._deferred_detachments.append(detach) + return future = self._runtime.dispatch(detach) - # Dangle a reference to the field off the future to prevent the - # field from being recycled until the detach is done - field = detach.field # type: ignore[union-attr] - future.field_reference = field # type: ignore[attr-defined] + # Hang the future on the detach operation itself + detach.future = future # If the future is already ready, then no need to track it - if future.is_ready(): - return None - self._pending_detachments[future] = alloc - return future + if not future.is_ready(): + self._pending_detachments.append(detach) def register_detachment(self, detach: Union[Detach, IndexDetach]) -> int: key = self._next_detachment_key @@ -654,22 +650,23 @@ def remove_detachment(self, detach_key: int) -> Union[Detach, IndexDetach]: return detach def perform_detachments(self) -> None: + # We have to clear the list first, otherwise the dispatch() of the + # detach brings us back here, and results in an infinite loop. detachments = self._deferred_detachments self._deferred_detachments = list() - for alloc, detach, field in detachments: - detach_future = self.detach_external_allocation( - alloc, detach, defer=False, previously_deferred=True - ) - if field is not None and detach_future is not None: - field.add_detach_future(detach_future) + for detach in detachments: + if detach.future is None: + self.detach_external_allocation( + detach, ordered=True, previously_deferred=True + ) def prune_detachments(self) -> None: - to_remove = [] - for future in self._pending_detachments.keys(): - if future.is_ready(): - to_remove.append(future) - for future in to_remove: - del self._pending_detachments[future] + new_pending: List[Union[Detach, IndexDetach]] = [] + for detach in self._pending_detachments: + assert detach.future is not None + if not detach.future.is_ready(): + new_pending.append(detach) + self._pending_detachments = new_pending class PartitionManager: @@ -1327,6 +1324,8 @@ def destroy(self) -> None: self.legion_runtime, self.legion_context, barrier ) + self._attachment_manager.destroy() + # Destroy all libraries. Note that we should do this # from the lastly added one to the first one for context in reversed(self._context_list): @@ -1334,8 +1333,6 @@ def destroy(self) -> None: del self._contexts del self._context_list - self._attachment_manager.destroy() - # Remove references to our legion resources so they can be collected self.active_region_managers = {} self.region_managers_by_region = {} @@ -1857,7 +1854,7 @@ def free_field( field_id: int, field_size: int, shape: Shape, - detach_future: Optional[Future] = None, + detach: Union[Detach, IndexDetach, None], ) -> None: # Have a guard here to make sure that we don't try to # do this after we have been destroyed @@ -1868,7 +1865,7 @@ def free_field( if key not in self.field_managers: return - self.field_managers[key].free_field(region, field_id, detach_future) + self.field_managers[key].free_field(region, field_id, detach) def import_output_region( self, out_region: OutputRegion, field_id: int, dtype: Any diff --git a/legate/core/store.py b/legate/core/store.py index c870f513f..10279256c 100644 --- a/legate/core/store.py +++ b/legate/core/store.py @@ -80,27 +80,30 @@ def __init__( self.field_id = field_id self.field_size = field_size self.shape = shape - self.detach_future: Optional[Future] = None + self.detach_key: int = -1 def same_handle(self, other: Field) -> bool: return ( # noqa type(self) == type(other) and self.field_id == other.field_id ) - def add_detach_future(self, future: Future) -> None: - self.detach_future = future - def __str__(self) -> str: return f"Field({self.field_id})" def __del__(self) -> None: + # Detach any existing allocation + detach = ( + None + if self.detach_key < 0 + else attachment_manager.remove_detachment(self.detach_key) + ) # Return our field back to the runtime runtime.free_field( self.region, self.field_id, self.field_size, self.shape, - self.detach_future, + detach, ) @@ -119,9 +122,6 @@ def __init__( self.field = field self.shape = shape self.parent = parent - # External allocation we attached to this field - self.attached_alloc: Union[None, Attachable] = None - self.detach_key: int = -1 # Physical region for attach self.physical_region: Union[None, PhysicalRegion] = None self.physical_region_refs = 0 @@ -129,10 +129,6 @@ def __init__( self._partitions: dict[Tiling, LegionPartition] = {} - def __del__(self) -> None: - if self.attached_alloc is not None: - self.detach_external_allocation(unordered=True, defer=True) - @staticmethod def create( region: Region, field_id: int, field_size: int, shape: Shape @@ -159,12 +155,7 @@ def attach_external_allocation( ) -> None: assert self.parent is None # If we already have some memory attached, detach it first - if self.attached_alloc is not None: - raise RuntimeError("A RegionField cannot be re-attached") - if ( - self.field.detach_future is not None - and not self.field.detach_future.is_ready() - ): + if self.field.detach_key >= 0: raise RuntimeError("A RegionField cannot be re-attached") # All inline mappings should have been unmapped by now assert self.physical_region_refs == 0 @@ -173,14 +164,15 @@ def attach_external_allocation( attachment_manager.attach_external_allocation(alloc, self) def record_detach(detach: Union[Detach, IndexDetach]) -> None: - # Dangle these fields off the detachment operation, to prevent - # premature collection - detach.field = self.field # type: ignore[union-attr] - detach.alloc = alloc # type: ignore[union-attr] + # Hang the allocation on the detach operation, so it won't be + # deallocated until the detach is processed. + detach.attached_alloc = alloc # Don't store the detachment operation here, instead register it # on the attachment manager and record its unique key # TODO: This might not be necessary anymore - self.detach_key = attachment_manager.register_detachment(detach) + self.field.detach_key = attachment_manager.register_detachment( + detach + ) if isinstance(alloc, memoryview): # Singleton attachment @@ -248,25 +240,6 @@ def record_detach(detach: Union[Detach, IndexDetach]) -> None: # We don't need to flush the contents back to the attached memory # if this is an internal temporary allocation. record_detach(IndexDetach(external_resources, flush=share)) - # Record the attachment - self.attached_alloc = alloc - - def detach_external_allocation( - self, unordered: bool, defer: bool = False - ) -> None: - assert self.parent is None - assert self.attached_alloc is not None - detach = attachment_manager.remove_detachment(self.detach_key) - detach.unordered = unordered # type: ignore[union-attr] - detach_future = attachment_manager.detach_external_allocation( - self.attached_alloc, detach, defer, dependent_field=self.field - ) - self.physical_region = None - self.physical_region_mapped = False - self.physical_region_refs = 0 - self.attached_alloc = None - if detach_future is not None: - self.field.add_detach_future(detach_future) def get_inline_mapped_region(self) -> PhysicalRegion: if self.parent is None: