From 6e9bf4462cff5b0a1a4468d15e4769032c9e4b7b Mon Sep 17 00:00:00 2001 From: Joey Ballentine Date: Mon, 1 Jul 2024 22:54:48 -0400 Subject: [PATCH] Don't cache individually run generators and delete any caches on new run --- backend/src/process.py | 25 ++++++++++++++++--------- backend/src/server.py | 13 ++++++++++++- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/backend/src/process.py b/backend/src/process.py index f0185a223..8cc9dc308 100644 --- a/backend/src/process.py +++ b/backend/src/process.py @@ -452,15 +452,18 @@ def __init__( self._storage_dir = storage_dir - async def process(self, node_id: NodeId) -> NodeOutput | CollectorOutput: + async def process( + self, node_id: NodeId, perform_cache: bool = True + ) -> NodeOutput | CollectorOutput: # Return cached output value from an already-run node if that cached output exists - cached = self.node_cache.get(node_id) - if cached is not None: - return cached + if perform_cache: + cached = self.node_cache.get(node_id) + if cached is not None: + return cached node = self.chain.nodes[node_id] try: - return await self.__process(node) + return await self.__process(node, perform_cache) except Aborted: raise except NodeExecutionError: @@ -478,14 +481,16 @@ async def process_regular_node(self, node: FunctionNode) -> RegularOutput: assert isinstance(result, RegularOutput) return result - async def process_generator_node(self, node: GeneratorNode) -> GeneratorOutput: + async def process_generator_node( + self, node: GeneratorNode, perform_cache: bool = True + ) -> GeneratorOutput: """ Processes the given iterator node. This will **not** iterate the returned generator. Only `node-start` and `node-broadcast` events will be sent. """ - result = await self.process(node.id) + result = await self.process(node.id, perform_cache) assert isinstance(result, GeneratorOutput) return result @@ -599,7 +604,9 @@ def __get_node_context(self, node: Node) -> _ExecutorNodeContext: return context - async def __process(self, node: Node) -> NodeOutput | CollectorOutput: + async def __process( + self, node: Node, perform_cache: bool = True + ) -> NodeOutput | CollectorOutput: """ Process a single node. @@ -653,7 +660,7 @@ def get_lazy_evaluation_time(): # TODO: execution time # Cache the output of the node - if not isinstance(output, CollectorOutput): + if perform_cache and not isinstance(output, CollectorOutput): self.node_cache.set(node.id, output, self.cache_strategy[node.id]) await self.progress.suspend() diff --git a/backend/src/server.py b/backend/src/server.py index 541b02ecf..6c5148e11 100644 --- a/backend/src/server.py +++ b/backend/src/server.py @@ -207,6 +207,16 @@ async def run(request: Request): chain = parse_json(full_data["data"]) optimize(chain) + # Remove all Generator values from the cache for each new run + # Otherwise, their state will cause them to resume from where they left off + schema_data = api.registry.nodes + for node in chain.nodes.values(): + node_schema = schema_data.get(node.schema_id) + if node_schema: + node_data, _ = node_schema + if node_data.kind == "generator" and ctx.cache.get(node.id): + ctx.cache.pop(node.id) + logger.info("Running new executor...") executor = Executor( id=executor_id, @@ -344,7 +354,8 @@ async def run_individual(request: Request): raise ValueError( f"Invalid node kind {node_data.kind} attempted to run individually" ) - ctx.cache[node_id] = output + if not isinstance(node, GeneratorNode): + ctx.cache[node_id] = output except Aborted: pass finally: