-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpython_runner.py
141 lines (116 loc) · 4.5 KB
/
python_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sys
import subprocess
import json
import pkg_resources
import asyncio
import os
import io
def install_package(package_name):
try:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package_name, '--no-input'])
return f"Successfully installed {package_name}"
except subprocess.CalledProcessError as e:
return f"Error installing {package_name}: {str(e)}"
def is_package_installed(package_name):
try:
pkg_resources.get_distribution(package_name)
return True
except pkg_resources.DistributionNotFound:
return False
class silence:
def __enter__(self):
self._old_stdout = sys.stdout
self._old_stderr = sys.stderr
self._buffer = io.StringIO()
sys.stdout = self._buffer
sys.stderr = self._buffer
return self
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self._old_stdout
sys.stderr = self._old_stderr
self._captured_value = self._buffer.getvalue()
return False
def __str__(self):
return self._captured_value
def run_python_code(code, context_json='{}'):
context = json.loads(context_json)
namespace = {}
namespace.update(context)
# Store original stdout/stderr before we do any capturing
original_stdout = sys.stdout
original_stderr = sys.stderr
def embed_file(filepath):
base_dir = namespace.get('templateDir', os.path.dirname(os.path.abspath(__file__)))
abs_path = os.path.join(base_dir, filepath)
if not os.path.exists(abs_path):
raise FileNotFoundError(f"Cannot embed file: {abs_path} does not exist.")
with open(abs_path, 'r', encoding='utf-8') as f:
script_code = f.read()
exec(script_code, namespace)
return f"Embedded {filepath}"
def require(package_or_path):
# If argument looks like a file path, embed it; else treat as a package
if package_or_path.endswith('.py') or package_or_path.startswith('.') or package_or_path.startswith('/'):
return embed_file(package_or_path)
else:
if not is_package_installed(package_or_path):
install_result = install_package(package_or_path)
return install_result
return f"{package_or_path} is ready to use."
def printit(*args, sep=' ', end='\n', file=None):
# Print directly to original_stdout, bypassing any capturing
message = sep.join(str(a) for a in args) + end
original_stdout.write(message)
original_stdout.flush()
# Add everything to the namespace
namespace['require'] = require
namespace['silence'] = silence
namespace['printit'] = printit
namespace['print'] = printit # Override print to use printit
indented_code = "\n".join(" " + line for line in code.splitlines())
async_wrapper = f"""
import asyncio
async def __user_async_func():
{indented_code}
result = asyncio.run(__user_async_func())
"""
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
try:
exec(async_wrapper, namespace)
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
if 'require' in namespace:
del namespace['require']
# 'silence', 'printit', and 'print' can remain if desired
def is_json_serializable(value):
try:
json.dumps(value)
return True
except:
return False
# Instead of filtering, let's ensure everything is serializable:
final_namespace = {}
for k, v in namespace.items():
try:
json.dumps(v)
final_namespace[k] = v
except:
# Fallback: convert to string if not serializable
final_namespace[k] = str(v)
# Ensure 'result' is correctly handled:
if 'result' in final_namespace:
# Already handled by fallback, but if you want special handling:
res_val = final_namespace['result']
# Here res_val is already a string if non-serializable
final_namespace['result'] = res_val
final_namespace['__captured_stdout__'] = captured_stdout
final_namespace['__captured_stderr__'] = captured_stderr
return json.dumps(final_namespace)
except Exception as e:
return json.dumps({"error": str(e)})
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr