-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathweb_proxy.py
82 lines (68 loc) · 2.71 KB
/
web_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import http.server
import socketserver
import urllib.request
import logging
from os.path import exists
from db import *
base_url = read()['base_url']
# Логирование
from icecream import ic
ic.disable() # Выключить отладку
class Proxy(http.server.SimpleHTTPRequestHandler):
# Отключение всех сообщений (чтобы не забивать вывод)
def log_message(self, format, *args):
pass
def do_GET(self):
ic(f"Request for: {self.path}")
# 'js-check.jet/favicon.ico' -> '127.0.0.1:8000/favicon.ico'
target = self.path
domain = target[7:]
domain = domain[:domain.find('/')]
if not exists(f'cached/{domain}'):
self.send_error(404, f"Site not found")
return 404
# Если статичный
if read(f'cached/{domain}/config.json')['type'] == 'static':
target = f'{base_url}/{target[7:]}' # http://127.0.0.1:8000 / js-check.jet
# Если динамический
elif read(f'cached/{domain}/config.json')['type'] == 'dynamic':
port = read(f'cached/{domain}/config.json')['port']
target = f'http://bore.del.pw:{port}/{target.replace(f"http://{domain}/", "")}'
ic(f"Modded request: {target}")
# Направление запроса по адресу
try:
with urllib.request.urlopen(target) as response:
self.send_response(response.getcode())
self.send_header("Content-type", response.headers.get_content_type())
self.end_headers()
self.wfile.write(response.read())
except Exception as e:
self.send_error(500, f"Error: {str(e)}")
def do_POST(self):
ic(f"Request for: {self.path}")
if not exists(f'cached/{domain}'):
self.send_error(404, f"Site not found")
return 404
if read(f'cached/{domain}/config.json')['type'] == 'static':
target = f'{base_url}/{target[7:]}' # http://127.0.0.1:8000 / js-check.jet
elif read(f'cached/{domain}/config.json')['type'] == 'dynamic':
port = read(f'cached/{domain}/config.json')['port']
target = f'http://bore.del.pw:{port}/{target.replace(f"http://{domain}/", "")}'
ic(f"Modded request: {target}")
# Направление запроса по адресу
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
req = urllib.request.Request(self.path, data=post_data, method='POST')
with urllib.request.urlopen(req) as response:
self.send_response(response.getcode())
self.send_header("Content-type", response.headers.get_content_type())
self.end_headers()
self.wfile.write(response.read())
except Exception as e:
self.send_error(500, f"Error: {str(e)}")
def web_proxy():
PORT = 8080
with socketserver.TCPServer(("", PORT), Proxy) as httpd:
print(f"HTTP proxy on port {PORT}")
httpd.serve_forever()