From 13b27e7b16a6a6b6619a69a34510053911ec784b Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Fri, 5 Jul 2024 16:17:41 +0200 Subject: [PATCH] Added global variables for working and output directories --- requirements.txt | 1 + swirlc/compiler/default.py | 58 +++++++++++++++++++++++--------------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9efbf5c..b17f4af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ antlr4-python3-runtime==4.13.1 black==24.4.2 importlib_resources==6.4.0 +Jinja2==3.1.4 jsonschema==4.22.0 referencing==0.35.1 ruamel.yaml==0.18.6 diff --git a/swirlc/compiler/default.py b/swirlc/compiler/default.py index 21ee380..7c23475 100644 --- a/swirlc/compiler/default.py +++ b/swirlc/compiler/default.py @@ -3,10 +3,11 @@ import os import stat import sys -from pathlib import Path -from typing import MutableMapping, MutableSequence, TextIO from black import WriteBack +from jinja2 import Template +from pathlib import Path +from typing import MutableMapping, MutableSequence, TextIO from swirlc.core.compiler import BaseCompiler from swirlc.core.entity import Location, Step, Port, Workflow, DistributedWorkflow, Data @@ -45,6 +46,8 @@ global_vars = """ BUF_SIZE = 8192 +OUT_DIR = str(Path("{{location_out_dir}}").expanduser().absolute()) +SCRATCH_DIR = str(Path("{{location_scratch_dir}}").expanduser().absolute()) available_port_data = {} condition: Condition = Condition() @@ -81,12 +84,12 @@ sock.close() """ -exec_function = """def _exec(step_name: str, step_display_name: str, input_port_names: MutableSequence[str], output_port_name: str, data_type: str, glob_regex: str | None, cmd: str, args: MutableSequence[str], args_from: MutableSequence[tuple[str, str]], workdir: str): +exec_function = """def _exec(step_name: str, step_display_name: str, input_port_names: MutableSequence[str], output_port_name: str, data_type: str, glob_regex: str | None, cmd: str, args: MutableSequence[str], args_from: MutableSequence[tuple[str, str]]): # Wait all the data for port_name in input_port_names: available_port_data[port_name].wait() # Prepare working directory - workdir = os.path.join(workdir, f"exec_{step_name}_{uuid.uuid4()}") + workdir = os.path.join(SCRATCH_DIR, f"exec_{step_name}_{uuid.uuid4()}") os.mkdir(workdir) for port_name in input_port_names: os.symlink(os.path.abspath(ports[port_name]), os.path.join(workdir, os.path.basename(ports[port_name]))) @@ -175,7 +178,7 @@ sock.close() """ -recv_function = """def _recv(port: str, workdir: str, data_type: str, src: str) -> Any: +recv_function = """def _recv(port: str, data_type: str, src: str) -> Any: buf = BytesIO() with condition: while connections.setdefault(src, {}).get(port) is None: @@ -196,7 +199,7 @@ elif data_type == "file": filename = connections[src][port].recv(1024).decode() connections[src][port].send("ack".encode("utf-8")) - filepath = os.path.join(workdir, f"rcv_{port}_{uuid.uuid4()}", filename) + filepath = os.path.join(SCRATCH_DIR, f"rcv_{port}_{uuid.uuid4()}", filename) os.mkdir(os.path.dirname(filepath)) fd = open(filepath, "wb") while True: @@ -226,19 +229,22 @@ t.join() """ -preamble = "\n".join( - [ - python_header, - imports, - global_vars, - accept_function, - exec_function, - init_dataset_function, - send_function, - recv_function, - thread_function, - wait_function, - ] + +preamble = Template( + "\n".join( + [ + python_header, + imports, + global_vars, + accept_function, + exec_function, + init_dataset_function, + send_function, + recv_function, + thread_function, + wait_function, + ] + ) ) @@ -299,10 +305,16 @@ def begin_location(self, location: Location) -> None: self.programs[self.current_location.name] = open( f"{self.current_location.name}.py", "w" ) - self.programs[self.current_location.name].write(preamble) + self.programs[self.current_location.name].write( + preamble.render( + location_scratch_dir=self.current_location.workdir, + location_out_dir=self.current_location.outdir, + ) + ) location = self.workflow.locations[self.current_location.name] self.programs[self.current_location.name].write( - f"""def main(): + f""" +def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(locations["{location.name}"]) sock.settimeout(3) @@ -475,7 +487,7 @@ def exec( {self._get_indentation()}input_port_names = {[port_name for port_name, _ in flow[0]]} {self._get_indentation()}for port_name in input_port_names: {self._get_indentation()} available_port_data.setdefault(port_name, Event()) - {self._get_indentation()}_exec("{step.name}", "{step.display_name}", input_port_names, "{output_port_name}", "{step.processors[output_port_name].type if output_port_name else ""}", "{step.processors[output_port_name].glob if output_port_name else ""}", "{step.command}", {arguments}, {arguments_from_port}, str(Path("{self.current_location.workdir}").expanduser().absolute()))""" + {self._get_indentation()}_exec("{step.name}", "{step.display_name}", input_port_names, "{output_port_name}", "{step.processors[output_port_name].type if output_port_name else ""}", "{step.processors[output_port_name].glob if output_port_name else ""}", "{step.command}", {arguments}, {arguments_from_port})""" ) def par(self) -> None: @@ -500,7 +512,7 @@ def f{self.function_counter}():""" def recv(self, port: str, data_type: str, src: str, dst: str): self.programs[self.current_location.name].write( f""" - {self._get_indentation()}{self._get_thread(self.current_location.name)} = _thread(_recv, "{port}", str(Path("{self.current_location.workdir}").expanduser().absolute()), "{data_type}", "{src}")""" + {self._get_indentation()}{self._get_thread(self.current_location.name)} = _thread(_recv, "{port}", "{data_type}", "{src}")""" ) def send(self, data: str, port: str, data_type: str, src: str, dst: str):