Skip to content

Commit

Permalink
Don't cache individually run generators and delete any caches on new run
Browse files Browse the repository at this point in the history
  • Loading branch information
joeyballentine committed Jul 2, 2024
1 parent 104eb76 commit 6e9bf44
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
25 changes: 16 additions & 9 deletions backend/src/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,18 @@ def __init__(

self._storage_dir = storage_dir

async def process(self, node_id: NodeId) -> NodeOutput | CollectorOutput:
async def process(
self, node_id: NodeId, perform_cache: bool = True
) -> NodeOutput | CollectorOutput:
# Return cached output value from an already-run node if that cached output exists
cached = self.node_cache.get(node_id)
if cached is not None:
return cached
if perform_cache:
cached = self.node_cache.get(node_id)
if cached is not None:
return cached

node = self.chain.nodes[node_id]
try:
return await self.__process(node)
return await self.__process(node, perform_cache)
except Aborted:
raise
except NodeExecutionError:
Expand All @@ -478,14 +481,16 @@ async def process_regular_node(self, node: FunctionNode) -> RegularOutput:
assert isinstance(result, RegularOutput)
return result

async def process_generator_node(self, node: GeneratorNode) -> GeneratorOutput:
async def process_generator_node(
self, node: GeneratorNode, perform_cache: bool = True
) -> GeneratorOutput:
"""
Processes the given iterator node.
This will **not** iterate the returned generator. Only `node-start` and
`node-broadcast` events will be sent.
"""
result = await self.process(node.id)
result = await self.process(node.id, perform_cache)
assert isinstance(result, GeneratorOutput)
return result

Expand Down Expand Up @@ -599,7 +604,9 @@ def __get_node_context(self, node: Node) -> _ExecutorNodeContext:

return context

async def __process(self, node: Node) -> NodeOutput | CollectorOutput:
async def __process(
self, node: Node, perform_cache: bool = True
) -> NodeOutput | CollectorOutput:
"""
Process a single node.
Expand Down Expand Up @@ -653,7 +660,7 @@ def get_lazy_evaluation_time():
# TODO: execution time

# Cache the output of the node
if not isinstance(output, CollectorOutput):
if perform_cache and not isinstance(output, CollectorOutput):
self.node_cache.set(node.id, output, self.cache_strategy[node.id])

await self.progress.suspend()
Expand Down
13 changes: 12 additions & 1 deletion backend/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ async def run(request: Request):
chain = parse_json(full_data["data"])
optimize(chain)

# Remove all Generator values from the cache for each new run
# Otherwise, their state will cause them to resume from where they left off
schema_data = api.registry.nodes
for node in chain.nodes.values():
node_schema = schema_data.get(node.schema_id)
if node_schema:
node_data, _ = node_schema
if node_data.kind == "generator" and ctx.cache.get(node.id):
ctx.cache.pop(node.id)

logger.info("Running new executor...")
executor = Executor(
id=executor_id,
Expand Down Expand Up @@ -344,7 +354,8 @@ async def run_individual(request: Request):
raise ValueError(
f"Invalid node kind {node_data.kind} attempted to run individually"
)
ctx.cache[node_id] = output
if not isinstance(node, GeneratorNode):
ctx.cache[node_id] = output
except Aborted:
pass
finally:
Expand Down

0 comments on commit 6e9bf44

Please sign in to comment.