From 7700b82062022f7cbaff08835b4e557f1459f45e Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 1 Feb 2024 17:22:53 +0100 Subject: [PATCH 01/10] Track dependents without weakrefs --- dask_expr/_core.py | 60 ++++++++++++++++++++++++++++++++++------ dask_expr/_expr.py | 4 +-- dask_expr/io/_delayed.py | 5 +--- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index 72e8cc1ba..f39f0e3d2 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -11,18 +11,18 @@ import toolz from dask.dataframe.core import is_dataframe_like, is_index_like, is_series_like from dask.utils import funcname, import_required, is_arraylike +from toolz.dicttoolz import merge from dask_expr._util import _BackendData, _tokenize_deterministic -def _unpack_collections(o): +def _unpack_expr(o): if isinstance(o, Expr): - return o - - if hasattr(o, "expr"): - return o.expr + return o, o._name + elif hasattr(o, "expr"): + return o.expr, o.expr._name else: - return o + return o, None class Expr: @@ -30,6 +30,7 @@ class Expr: _defaults = {} def __init__(self, *args, **kwargs): + self._dependencies = {} operands = list(args) for parameter in type(self)._parameters[len(operands) :]: try: @@ -37,8 +38,32 @@ def __init__(self, *args, **kwargs): except KeyError: operands.append(type(self)._defaults[parameter]) assert not kwargs, kwargs - operands = [_unpack_collections(o) for o in operands] - self.operands = operands + parsed_operands = [] + children = set() + _subgraphs = [] + _subgraph_instances = [] + _graph_instances = {} + for o in operands: + expr, name = _unpack_expr(o) + parsed_operands.append(expr) + if name is not None: + children.add(name) + _subgraphs.append(expr._graph) + _subgraph_instances.append(expr._graph_instances) + _graph_instances[name] = expr + + self.operands = parsed_operands + name = self._name + # Graph instances is a mapping name -> Expr instance + # Graph itself is a mapping of dependencies mapping names to a set of names + self._graph_instances = merge(_graph_instances, *_subgraph_instances) + self._graph = merge(*_subgraphs) + self._graph[name] = children + # Probably a bad idea to have a self ref + self._graph_instances[name] = self + + def __hash__(self): + raise TypeError("Don't!") def __str__(self): s = ", ".join( @@ -131,6 +156,21 @@ def dependencies(self): # Dependencies are `Expr` operands only return [operand for operand in self.operands if isinstance(operand, Expr)] + @functools.cached_property + def _dependent_graph(self): + rv = defaultdict(set) + # This should be O(E) + for expr, dependencies in self._graph.items(): + rv[expr] + for dep in dependencies: + rv[dep].add(expr) + for name, exprs in rv.items(): + rv[name] = {self._graph_instances[e] for e in exprs} + return rv + + def dependents(self): + return self._dependent_graph + def _task(self, index: int): """The task for the i'th partition @@ -312,7 +352,7 @@ def simplify_once(self, dependents: defaultdict): def simplify(self) -> Expr: expr = self while True: - dependents = collect_depdendents(expr) + dependents = expr.dependents() new = expr.simplify_once(dependents=dependents) if new._name == expr._name: break @@ -693,4 +733,6 @@ def collect_depdendents(expr) -> defaultdict: for dep in node.dependencies(): stack.append(dep) dependents[dep._name].append(weakref.ref(node)) + precalculated = expr.dependents() + assert precalculated == dependents return dependents diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index feef7ba0d..8e560cf7d 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1186,7 +1186,7 @@ def _simplify_up(self, parent, dependents): if self._name != parent.frame._name: # We can't push the filter through the filter condition return - parents = [x() for x in dependents[self._name] if x() is not None] + parents = dependents[self._name] if not all(isinstance(p, Filter) for p in parents): return return type(self)( @@ -3171,7 +3171,7 @@ def determine_column_projection(expr, parent, dependents, additional_columns=Non column_union = [] else: column_union = parent.columns.copy() - parents = [x() for x in dependents[expr._name] if x() is not None] + parents = dependents[expr._name] for p in parents: if len(p.columns) > 0: diff --git a/dask_expr/io/_delayed.py b/dask_expr/io/_delayed.py index cc2aa4ed3..646a0fb94 100644 --- a/dask_expr/io/_delayed.py +++ b/dask_expr/io/_delayed.py @@ -20,10 +20,7 @@ class _DelayedExpr(Expr): # Wraps a Delayed object to make it an Expr for now. This is hacky and we should # integrate this properly... # TODO - - def __init__(self, obj): - self.obj = obj - self.operands = [obj] + _parameters = ["obj"] @property def _name(self): From 353158364c305aa7c25e495f02772588daadc56e Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 1 Feb 2024 17:28:34 +0100 Subject: [PATCH 02/10] delete the old thing --- dask_expr/_core.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index f39f0e3d2..f1f5c46d5 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -2,7 +2,6 @@ import functools import os -import weakref from collections import defaultdict from collections.abc import Generator @@ -718,21 +717,3 @@ def find_operations(self, operation: type | tuple[type]) -> Generator[Expr]: or issubclass(operation, Expr) ), "`operation` must be`Expr` subclass)" return (expr for expr in self.walk() if isinstance(expr, operation)) - - -def collect_depdendents(expr) -> defaultdict: - dependents = defaultdict(list) - stack = [expr] - seen = set() - while stack: - node = stack.pop() - if node._name in seen: - continue - seen.add(node._name) - - for dep in node.dependencies(): - stack.append(dep) - dependents[dep._name].append(weakref.ref(node)) - precalculated = expr.dependents() - assert precalculated == dependents - return dependents From af85220c731b465e5fada5fd6d183e8751f8f9cb Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 11:53:35 +0100 Subject: [PATCH 03/10] Update dask_expr/_core.py --- dask_expr/_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index f1f5c46d5..6a64bc4ed 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -62,7 +62,7 @@ def __init__(self, *args, **kwargs): self._graph_instances[name] = self def __hash__(self): - raise TypeError("Don't!") + raise TypeError("Expr objects can't be used in sets or dicts or similar, use the _name instead") def __str__(self): s = ", ".join( From a9f99048d86e46457c4f87d4cb9304fa873be80a Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:10:26 +0100 Subject: [PATCH 04/10] Update --- dask_expr/_core.py | 5 +++-- dask_expr/_expr.py | 10 ++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index fb9b797f8..753e84d2f 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -75,6 +75,7 @@ def __new__(cls, *args, **kwargs): inst._graph[_name] = children # Probably a bad idea to have a self ref inst._graph_instances[_name] = inst + return inst def __hash__(self): raise TypeError( @@ -367,8 +368,8 @@ def simplify_once(self, dependents: defaultdict, simplified: dict): changed = False for operand in expr.operands: if isinstance(operand, Expr): - # Bandaid for now, waiting for Singleton - dependents[operand._name].append(weakref.ref(expr)) + # # Bandaid for now, waiting for Singleton + # dependents[operand._name].append(weakref.ref(expr)) new = operand.simplify_once( dependents=dependents, simplified=simplified ) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index ce273e36c..5c9060cc6 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -2077,9 +2077,7 @@ def _simplify_up(self, parent, dependents): parent, dependents ): parents = [ - p().columns - for p in dependents[self._name] - if p() is not None and not isinstance(p(), Filter) + p.columns for p in dependents[self._name] if not isinstance(p, Filter) ] predicate = None if not set(flatten(parents, list)).issubset(set(self.frame.columns)): @@ -2108,7 +2106,7 @@ def _simplify_up(self, parent, dependents): if col in (self.name, "index", self.frame._meta.index.name): return if all( - isinstance(d(), Projection) and d().operand("columns") == col + isinstance(d, Projection) and d.operand("columns") == col for d in dependents[self._name] ): return type(self)(self.frame, True, self.name) @@ -3502,7 +3500,7 @@ def plain_column_projection(expr, parent, dependents, additional_columns=None): def is_filter_pushdown_available(expr, parent, dependents, allow_reduction=True): - parents = [x() for x in dependents[expr._name] if x() is not None] + parents = dependents[expr._name] filters = {e._name for e in parents if isinstance(e, Filter)} if len(filters) != 1: # Don't push down if not exactly one Filter @@ -3609,7 +3607,7 @@ def _check_dependents_are_predicates( continue seen.add(e._name) - e_dependents = {x()._name for x in dependents[e._name] if x() is not None} + e_dependents = {x._name for x in dependents[e._name]} if not allow_reduction: if isinstance(e, Reduction): From 3155405f4806f46b3b69d809502b8ebfcb06727d Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:48:39 +0100 Subject: [PATCH 05/10] Update --- dask_expr/_core.py | 5 +++-- dask_expr/_expr.py | 6 +----- dask_expr/tests/test_collection.py | 1 + 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index 753e84d2f..ec2f9c241 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -10,6 +10,7 @@ import pandas as pd import toolz from dask.dataframe.core import is_dataframe_like, is_index_like, is_series_like +from dask.delayed import Delayed from dask.utils import funcname, import_required, is_arraylike from toolz.dicttoolz import merge @@ -19,7 +20,7 @@ def _unpack_collections(o): if isinstance(o, Expr): return o, o._name - elif hasattr(o, "expr"): + elif hasattr(o, "expr") and not isinstance(o, Delayed): return o.expr, o.expr._name else: return o, None @@ -369,7 +370,7 @@ def simplify_once(self, dependents: defaultdict, simplified: dict): for operand in expr.operands: if isinstance(operand, Expr): # # Bandaid for now, waiting for Singleton - # dependents[operand._name].append(weakref.ref(expr)) + dependents[operand._name].add(expr) new = operand.simplify_once( dependents=dependents, simplified=simplified ) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 5c9060cc6..b6b731d84 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1153,7 +1153,7 @@ def _simplify_up(self, parent, dependents): ): predicate = None if self.frame.ndim == 1 and self.ndim == 2: - name = self.frame._meta.name + name = self._meta.columns[0] # Avoid Projection since we are already a Series subs = Projection(self, name) predicate = parent.predicate.substitute(subs, self.frame) @@ -2714,10 +2714,6 @@ class _DelayedExpr(Expr): # TODO _parameters = ["obj"] - def __init__(self, obj): - self.obj = obj - self.operands = [obj] - def __str__(self): return f"{type(self).__name__}({str(self.obj)})" diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index bbdee63d8..fefe7e8a1 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -2387,6 +2387,7 @@ def test_predicate_pushdown_ndim_change(df, pdf): expected = expected[expected[0] > 1] assert_eq(result, expected) assert isinstance(result.simplify().expr.frame, Filter) + result.simplify().pprint() assert isinstance(result.simplify().expr, ToFrame) From 39b3a26825bd331b0d8f34e93501abc8155f81b0 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 18:55:11 +0100 Subject: [PATCH 06/10] Don't run concurrently --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 7abe85b7c..7404ef075 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -62,7 +62,7 @@ jobs: run: pip list | grep -E 'dask|distributed' - name: Run tests - run: py.test -n auto --verbose --cov=dask_expr --cov-report=xml + run: py.test --verbose --cov=dask_expr --cov-report=xml - name: Coverage uses: codecov/codecov-action@v3 From 431c30202217a89540d25b8b7e611455bcdcfbd4 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 18:59:58 +0100 Subject: [PATCH 07/10] Don't run concurrently --- .github/workflows/test.yaml | 2 +- dask_expr/tests/test_shuffle.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 7404ef075..7abe85b7c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -62,7 +62,7 @@ jobs: run: pip list | grep -E 'dask|distributed' - name: Run tests - run: py.test --verbose --cov=dask_expr --cov-report=xml + run: py.test -n auto --verbose --cov=dask_expr --cov-report=xml - name: Coverage uses: codecov/codecov-action@v3 diff --git a/dask_expr/tests/test_shuffle.py b/dask_expr/tests/test_shuffle.py index c7b977284..7484cbdfa 100644 --- a/dask_expr/tests/test_shuffle.py +++ b/dask_expr/tests/test_shuffle.py @@ -1,3 +1,4 @@ +import random from collections import OrderedDict import dask @@ -640,7 +641,7 @@ def test_set_index_sort_values_one_partition(pdf): def test_set_index_triggers_calc_when_accessing_divisions(pdf, df): divisions_lru.data = OrderedDict() - query = df.set_index("x") + query = df.fillna(random.randint(1, 100)).set_index("x") assert len(divisions_lru.data) == 0 divisions = query.divisions # noqa: F841 assert len(divisions_lru.data) == 1 From 1faa475d26bf4759e74e1b756b70c03f8fa07dbb Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 19:06:07 +0100 Subject: [PATCH 08/10] Update --- dask_expr/tests/test_collection.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index 4b4daf451..3404f213d 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -2392,7 +2392,6 @@ def test_predicate_pushdown_ndim_change(df, pdf): expected = expected[expected[0] > 1] assert_eq(result, expected) assert isinstance(result.simplify().expr.frame, Filter) - result.simplify().pprint() assert isinstance(result.simplify().expr, ToFrame) From bda2577b90e143f4b1785852db2cd3c74ffaa744 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 14 Feb 2024 23:41:09 +0100 Subject: [PATCH 09/10] Fix reads from local dir that changes directory --- dask_expr/_collection.py | 19 +++++++++++++++++++ dask_expr/io/csv.py | 2 ++ dask_expr/io/parquet.py | 2 ++ dask_expr/io/tests/test_io.py | 34 +++++++++++++++++++++++++++++++--- 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index ce39a1c7e..646b2c3ee 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -3,6 +3,7 @@ import datetime import functools import inspect +import os import warnings from collections.abc import Callable, Hashable, Mapping from numbers import Integral, Number @@ -4526,6 +4527,7 @@ def read_csv( storage_options=storage_options, kwargs=kwargs, header=header, + _cwd=_get_cwd(path, kwargs), ) ) @@ -4551,6 +4553,7 @@ def read_table( storage_options=storage_options, kwargs=kwargs, header=header, + _cwd=_get_cwd(path, kwargs), ) ) @@ -4576,10 +4579,25 @@ def read_fwf( storage_options=storage_options, kwargs=kwargs, header=header, + _cwd=_get_cwd(path, kwargs), ) ) +def _get_protocol(urlpath): + if "://" in urlpath: + protocol, _ = urlpath.split("://", 1) + if len(protocol) > 1: + # excludes Windows paths + return protocol + return None + + +def _get_cwd(path, kwargs): + protocol = kwargs.pop("protocol", None) or _get_protocol(path) or "file" + return os.getcwd() if protocol == "file" else None + + def read_parquet( path=None, columns=None, @@ -4630,6 +4648,7 @@ def read_parquet( filesystem=filesystem, engine=_set_parquet_engine(engine), kwargs=kwargs, + _cwd=_get_cwd(path, kwargs), _series=isinstance(columns, str), ) ) diff --git a/dask_expr/io/csv.py b/dask_expr/io/csv.py index 6be02153d..7bc0618e4 100644 --- a/dask_expr/io/csv.py +++ b/dask_expr/io/csv.py @@ -14,6 +14,7 @@ class ReadCSV(PartitionsFiltered, BlockwiseIO): "_partitions", "storage_options", "kwargs", + "_cwd", # needed for tokenization "_series", ] _defaults = { @@ -24,6 +25,7 @@ class ReadCSV(PartitionsFiltered, BlockwiseIO): "_partitions": None, "storage_options": None, "_series": False, + "_cwd": None, } _absorb_projections = True diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 8cd9e8c3d..f722a4d8f 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -426,6 +426,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "filesystem", "engine", "kwargs", + "_cwd", # needed for tokenization "_partitions", "_series", "_dataset_info_cache", @@ -449,6 +450,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "_partitions": None, "_series": False, "_dataset_info_cache": None, + "_cwd": None, } _pq_length_stats = None _absorb_projections = True diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index cca5d63c0..62619c314 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -1,5 +1,6 @@ import glob import os +from pathlib import Path import dask.array as da import dask.dataframe as dd @@ -30,14 +31,14 @@ pd = _backend_library() -def _make_file(dir, format="parquet", df=None): +def _make_file(dir, format="parquet", df=None, **kwargs): fn = os.path.join(str(dir), f"myfile.{format}") if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) if format == "csv": - df.to_csv(fn) + df.to_csv(fn, **kwargs) elif format == "parquet": - df.to_parquet(fn) + df.to_parquet(fn, **kwargs) else: ValueError(f"{format} not a supported format") return fn @@ -413,6 +414,33 @@ def test_combine_similar_no_projection_on_one_branch(tmpdir): assert_eq(df, pdf) +@pytest.mark.parametrize( + "fmt, func, kwargs", + [ + ("parquet", read_parquet, {}), + ("csv", read_csv, {"index": False}), + ], +) +def test_chdir_different_files(tmpdir, fmt, func, kwargs): + cwd = os.getcwd() + + try: + pdf = pd.DataFrame({"x": [0, 1, 2, 3] * 4, "y": range(16)}) + os.chdir(tmpdir) + _make_file(tmpdir, format=fmt, df=pdf, **kwargs) + df = func(f"myfile.{fmt}") + + new_dir = Path(tmpdir).joinpath("new_dir") + new_dir.mkdir() + os.chdir(new_dir) + pdf2 = pd.DataFrame({"x": [0, 100, 200, 300] * 4, "y": range(16)}) + _make_file(new_dir, format=fmt, df=pdf2, **kwargs) + df2 = func(f"myfile.{fmt}") + assert_eq(df.sum() + df2.sum(), pd.Series([2424, 240], index=["x", "y"])) + finally: + os.chdir(cwd) + + @pytest.mark.parametrize("meta", [True, False]) @pytest.mark.parametrize("label", [None, "foo"]) @pytest.mark.parametrize("allow_projection", [True, False]) From 97029db745b6b4cf772714e223ab99225c1e3f68 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 15 Feb 2024 11:36:17 +0100 Subject: [PATCH 10/10] Update to keep track of refs --- dask_expr/_core.py | 53 +++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/dask_expr/_core.py b/dask_expr/_core.py index ec2f9c241..31f4e0f58 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -30,6 +30,8 @@ class Expr: _parameters = [] _defaults = {} _instances = weakref.WeakValueDictionary() + _dependents = defaultdict(list) + _seen = set() def __new__(cls, *args, **kwargs): operands = list(args) @@ -68,16 +70,41 @@ def __new__(cls, *args, **kwargs): inst._graph[_name].update(children) # Probably a bad idea to have a self ref inst._graph_instances[_name] = inst + + else: + Expr._instances[_name] = inst + inst._graph_instances = merge(_graph_instances, *_subgraph_instances) + inst._graph = merge(*_subgraphs) + inst._graph[_name] = children + # Probably a bad idea to have a self ref + inst._graph_instances[_name] = inst + + if inst._name in Expr._seen: + # We already registered inst as a dependent of all it's + # dependencies, so we don't need to do it again return inst - Expr._instances[_name] = inst - inst._graph_instances = merge(_graph_instances, *_subgraph_instances) - inst._graph = merge(*_subgraphs) - inst._graph[_name] = children - # Probably a bad idea to have a self ref - inst._graph_instances[_name] = inst + Expr._seen.add(inst._name) + for dep in inst.dependencies(): + Expr._dependents[dep._name].append(inst) + return inst + @functools.cached_property + def _dependent_graph(self): + # Reset to clear tracking + Expr._dependents = defaultdict(list) + Expr._seen = set() + rv = Expr._dependents + # This should be O(E) + tmp = defaultdict(set) + for expr, dependencies in self._graph.items(): + for dep in dependencies: + tmp[dep].add(expr) + for name, exprs in tmp.items(): + rv[name] = [self._graph_instances[e] for e in exprs] + return rv + def __hash__(self): raise TypeError( "Expr objects can't be used in sets or dicts or similar, use the _name instead" @@ -186,18 +213,6 @@ def dependencies(self): # Dependencies are `Expr` operands only return [operand for operand in self.operands if isinstance(operand, Expr)] - @functools.cached_property - def _dependent_graph(self): - rv = defaultdict(set) - # This should be O(E) - for expr, dependencies in self._graph.items(): - rv[expr] - for dep in dependencies: - rv[dep].add(expr) - for name, exprs in rv.items(): - rv[name] = {self._graph_instances[e] for e in exprs} - return rv - def dependents(self): return self._dependent_graph @@ -369,8 +384,6 @@ def simplify_once(self, dependents: defaultdict, simplified: dict): changed = False for operand in expr.operands: if isinstance(operand, Expr): - # # Bandaid for now, waiting for Singleton - dependents[operand._name].add(expr) new = operand.simplify_once( dependents=dependents, simplified=simplified )