-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcli.py
59 lines (46 loc) · 1.46 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import shlex
import subprocess
import sys
from typing import List
import hydra
from flatten_dict import flatten
from omegaconf import DictConfig
def run_subprocess(command: str):
print(f"Running command: \n\n{command}\n")
process = subprocess.Popen(
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=2,
)
with process.stdout as p_out:
for line in iter(p_out.readline, b""): # b'\n'-separated lines
print(line.decode().strip())
process.wait() # to fetch returncode
return process.returncode
def get_all_params_overrides(cfg: DictConfig) -> List[str]:
config_flat = flatten(cfg)
params_overrides = [
".".join(param_keys) + ":" + str(param_value)
for param_keys, param_value in config_flat.items()
]
return params_overrides
@hydra.main(config_path="conf", config_name="config", version_base=None)
def main(cfg: DictConfig):
pipeline = cfg.pipeline if "pipeline" in cfg.keys() else "__default__"
params_overrides = get_all_params_overrides(cfg)
kedro_bin = os.path.join(os.path.split(sys.executable)[0], "kedro")
command = " ".join(
[
kedro_bin,
"run",
f"--pipeline={pipeline}",
f'--params="{",".join(params_overrides)}"',
]
)
returncode = run_subprocess(command)
if returncode:
raise Exception
if __name__ == "__main__":
main()