-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathexec_worker.py
106 lines (83 loc) · 3.26 KB
/
exec_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
exec_worker.py - library used for controlling BAS fork server process
"""
import os
import subprocess
import struct
from typing import List, Tuple, Optional
###############################
# Execution worker library
###############################
class ExecWorkerException(Exception):
pass
class ExecWorker:
initialized: bool = False
def __init__(self, workerPath: str = "./worker"):
self.workerPath = workerPath
self.initialize()
def __del__(self):
self.destroy()
""" Read precisely `size` bytes from provided file descriptors
Used to handle partial reads than can occur when using os.read
"""
def _safe_read(self, fd: int, size: int) -> bytes:
data = b""
while size > 0:
part = os.read(fd, size)
size -= len(part)
data += part
return data
""" Writes precisely `size` bytes to the provided FD
Used for the same reason as _safe_read
"""
def _safe_write(self, fd: int, buf: bytes) -> None:
offset = 0
while offset < len(buf):
part = os.write(fd, buf[offset:])
offset += part
def initialize(self) -> None:
self.pRead, self.pChildWrite = os.pipe()
self.pChildRead, self.pWrite = os.pipe()
self.worker = subprocess.Popen([self.workerPath, str(self.pChildRead), str(self.pChildWrite)],
shell=False, pass_fds=(self.pChildRead, self.pChildWrite))
os.close(self.pChildRead)
os.close(self.pChildWrite)
self.initialized = True
worker_status = self.worker.poll()
if worker_status is not None:
self.destroy()
raise ExecWorkerException(f"Worker process has terminated unexpectedly with error code {worker_status}")
def destroy(self) -> None:
if not self.initialized:
return
os.close(self.pWrite)
os.close(self.pRead)
self.worker.kill()
self.initialized = False
def runCmd(self, cwd: str, cmd: str, args: List[str], input: Optional[str] = None) -> Tuple[str, str, int]:
worker_status = self.worker.poll()
if worker_status is not None:
self.destroy()
raise ExecWorkerException(f"Cannot run command - worker exited with error code {worker_status}")
if input is None:
input = ""
# Send command to worker
bCwd = cwd.encode()
bInput = input.encode()
dataToSend = struct.pack("III", len(bCwd), len(bInput), len(args) + 1)
dataToSend += bCwd
dataToSend += bInput
for arg in [cmd, *args]:
bArg = arg.encode()
dataToSend += struct.pack("I", len(bArg))
dataToSend += bArg
self._safe_write(self.pWrite, dataToSend)
# Hang on pipe read and retrieve output
header = self._safe_read(self.pRead, 14)
sizeOut, sizeErr, retCode, error = struct.unpack("IIiH", header)
dataOut = self._safe_read(self.pRead, sizeOut)
dataErr = self._safe_read(self.pRead, sizeErr)
if error:
self.destroy()
raise ExecWorkerException(f"Cannot run command - worker encountered an error {dataErr.decode()}")
return dataOut.decode(), dataErr.decode(), retCode