-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathomniproxy.py
executable file
·300 lines (227 loc) · 11.3 KB
/
omniproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
#!/usr/bin/env python
from twisted.internet import reactor
from twisted.internet import ssl as twistedssl
import forwarder
from logger_callbacks import SocketLogger
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec
from OpenSSL.crypto import load_certificate, dump_certificate, dump_privatekey, load_privatekey, FILETYPE_PEM
from OpenSSL.SSL import SSLv23_METHOD, Context
import ssl, argparse, os, re, sys
#Creating Custom SSLServerContext
class SNISSLServerContext(twistedssl.DefaultOpenSSLContextFactory):
def __init__(self, CA, default_server, default_port, privateKeyFileName, certificateFileName, sslmethod=SSLv23_METHOD):
self.default_version = sslmethod
self.default_server = default_server
self.default_port = default_port
self.CA = CA
return super().__init__(privateKeyFileName, certificateFileName, sslmethod)
def cacheContext(self):
return super().cacheContext()
def SNICallback(self, connection):
prev_context = connection.get_context()
#Stop Producing Data untill connection is made
server_object = prev_context._server_context
server_object.transport.pauseProducing()
#Get Host from SNI
host = connection.get_servername()
if host:
host = host.decode("utf-8")
else:
host = self.default_server
#Get Certificate from Server
ssl_certs = self.CA.clone_certificate({"server": host, "port": self.default_port })
#Update the SSL Context
new_context = Context(self.default_version)
new_context.new_host = (host, self.default_port)
new_context.use_privatekey(load_privatekey(FILETYPE_PEM, open(ssl_certs["keyfile"]).read()))
new_context.use_certificate(load_certificate(FILETYPE_PEM, open(ssl_certs["certfile"]).read()))
connection.set_context(new_context)
#Call Proxy Server to finish setting up the Connection
server_object.SNICallback(connection)
def getContext(self):
self._context.set_tlsext_servername_callback(self.SNICallback)
#self._context.set_keylog_callback(self.SNICallback)
return self._context
class CertificateAuthority(object):
"""docstring for CertificateAuthority"""
def __init__(self, ca_file, cache_dir="ssl_cache"):
#print("Initializing CertificateAuthority ca_file={} cache_dir={}".format(ca_file, cache_dir))
self.ca_file = ca_file
self.cache_dir = cache_dir
self.CERT_PREFIX = "fake_cert"
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
if not os.path.exists(ca_file):
raise Exception("No cert exists at {}".format(ca_file))
else:
self._read_ca(ca_file)
def clone_certificate(self, remote_server):
cert_string = self.get_certificate_from_server(remote_server)
#Get Certificate properties
cert_dict = self.parse_pem(cert_string)
#Check if exists in cache
cnp = os.path.sep.join([self.cache_dir, '{}-{}.pem'.format(self.CERT_PREFIX, cert_dict["subject"]["commonName"])])
if os.path.exists(cnp):
print("Cert already exists common_name={}".format(cert_dict["subject"]["commonName"]))
else:
print("Creating and signing cert common_name={}".format(cert_dict["subject"]["commonName"]))
#Generating the Correct Private Keytype
if cert_dict["public_key"] == "RSA":
private_key = rsa.generate_private_key(public_exponent=cert_dict["public_key_pub_exp"],
key_size=cert_dict["public_key_size"], backend=default_backend())
elif cert_dict["public_key"] == "DSA":
private_key = dsa.generate_private_key(key_size=cert_dict["public_key_size"], backend=default_backend())
elif cert_dict["public_key"] == "EllipticCurve":
private_key = ec.generate_private_key(cert_dict["public_key_curve"], default_backend())
#Generate Certificate Signing Request
csr_subject = cert_dict["subject_object"]
#csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(csr_subject))
#for extension in cert_dict["ext_object"]:
# print("{}, {}".format(extension.oid._name, extension.critical))
# if extension.oid._name not in ["signedCertificateTimestampList"]:
# csr = csr.add_extension(extension.value, critical=extension.critical)
#csr = csr.sign(private_key, cert_dict["signature_hash_algorithm"], default_backend())
#Generate Certificate
cert = x509.CertificateBuilder().subject_name(csr_subject)
cert = cert.issuer_name(self.cert.subject)
cert = cert.public_key(private_key.public_key())
cert = cert.serial_number(cert_dict["serial_number"])
cert = cert.not_valid_before(cert_dict["not_valid_before"])
cert = cert.not_valid_after(cert_dict["not_valid_after"])
for extension in cert_dict["ext_object"]:
#print("{}, {}".format(extension.oid._name, extension.critical))
if extension.oid._name not in ["signedCertificateTimestampList"]:
cert = cert.add_extension(extension.value, critical=extension.critical)
cert = cert.sign(self.key, cert_dict["signature_hash_algorithm"], default_backend())
# Write our certificate out to disk.
with open(cnp, 'wb+') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(cnp[:-3] + "key", 'wb+') as f:
f.write(private_key.private_bytes(encoding=serialization.Encoding.PEM, encryption_algorithm=serialization.NoEncryption(), format=serialization.PrivateFormat.TraditionalOpenSSL))
return {"certfile": cnp, "keyfile": cnp[:-3] + "key"}
def _read_ca(self, file):
#Load Certificate
self.cert = load_certificate(FILETYPE_PEM, open(file).read())
self.cert = x509.load_pem_x509_certificate(dump_certificate(FILETYPE_PEM, self.cert), backend=default_backend())
#Load Key File
self.key = load_privatekey(FILETYPE_PEM, open(file).read())
self.key = serialization.load_pem_private_key(dump_privatekey(FILETYPE_PEM, self.key), password=None, backend=default_backend())
def parse_pem(self, cert_data):
try:
cert = x509.load_pem_x509_certificate(str.encode(cert_data), default_backend())
cert_dict = {}
#Clone Serial Number
cert_dict["serial_number"] = cert.serial_number
#Keytype
if isinstance(cert.public_key(), rsa.RSAPublicKey):
#print("Keytype: RSA")
cert_dict["public_key"] = "RSA"
cert_dict["public_key_size"] = cert.public_key().key_size
cert_dict["public_key_pub_exp"] = cert.public_key().public_numbers().e
elif isinstance(cert.public_key(), dsa.DSAPublicKey):
#print("Keytype: DSA")
cert_dict["public_key"] = "DSA"
cert_dict["public_key_size"] = cert.public_key().key_size
elif isinstance(cert.public_key(), ec.EllipticCurvePublicKey):
#print("Keytype: EllipticCurve")
cert_dict["public_key"] = "EllipticCurve"
cert_dict["public_key_size"] = cert.public_key().key_size
cert_dict["public_key_curve"] = cert.public_key().curve
else:
raise Exception("Invalid Keytype")
#Copy Validity Start Date
cert_dict["not_valid_before"] = cert.not_valid_before
#Copy Validity End Date
cert_dict["not_valid_after"] = cert.not_valid_after
#Use Same Signature Algorithum
cert_dict["signature_hash_algorithm"] = cert.signature_hash_algorithm
#Copy Subject and other object data
cert_dict["subject"] = dict()
cert_dict["subject_object"] = cert.subject
for attribute in cert.subject:
cert_dict["subject"][attribute.oid._name] = attribute.value
#Do not copy the Origional Issuer Data
#cert_dict["issuer"] = dict()
#for attribute in cert.issuer:
#cert_dict["issuer"][attribute.oid._name] = attribute.value
#print("Issuer {}: {}".format(attribute.oid._name, attribute.value))
#Copy the Certificate Extentions
cert_dict["ext_object"] = cert.extensions
return cert_dict
except Exception as e:
print("Error decoding certificate: {}".format(e))
def get_certificate_from_server(self, remote_server):
print("Getting Server Certificate from {}:{}".format(remote_server["server"], remote_server["port"]))
try:
with ssl.create_connection((remote_server["server"], remote_server["port"])) as conn:
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
sock = context.wrap_socket(conn, server_hostname=remote_server["server"])
certificate = ssl.DER_cert_to_PEM_cert(sock.getpeercert(True))
return certificate
except Exception as e:
print("Network error: {}".format(e))
if __name__ == '__main__':
p = argparse.ArgumentParser(description="Modular Intercept Proxy")
p.add_argument("--local-port","-p",type=int, default=443,
metavar="<port>",
help="Local proxy port (default:443)")
p.add_argument("--destination","-d",default="",
metavar="<destination>",required=True,
help="Server Destination example www.google.com:80")
p.add_argument("--listen-address","-l", default="0.0.0.0",
metavar="<listen-address>",
help="Specify the listen address (default is 0.0.0.0)")
p.add_argument("--tcp", "-t", action='store_true')
p.add_argument("--udp", "-u", action='store_true')
p.add_argument("--cafile","-c", metavar="<certificate-file>")
p.add_argument("--log-folder", default="logs")
p.add_argument("--quiet", "-q", action='store_true')
args = p.parse_args()
server = args.destination.split(":")
# Fix Destination Server and modify defaults
if len(server) == 2:
remote_server = {"server": server[0] ,"port": int(server[1])}
else:
remote_server = {"server": server[0] ,"port": args.local_port}
proxy = args.listen_address.split(":")
# Destination Server
if len(proxy) == 2:
proxy_server = {"server": proxy[0] ,"port": int(proxy[1])}
elif proxy[0].isdigit():
proxy_server = {server: None, "port": proxy[0]}
#TODO set default SSL Context
else:
proxy_server = {"server": proxy[0] ,"port": 443}
#Set Logger Functions
logger = SocketLogger(args.log_folder, not args.quiet)
forwarder.setClientReceiveCallback(logger.on_server2client_done_read)
forwarder.setServerReceiveCallback(logger.on_client2server_done_read)
forwarder.setClientStartCallback(logger.on_client2server_new_connection)
forwarder.setServerStartCallback(logger.on_server2client_new_connection)
forwarder.setClientCloseCallback(logger.on_client2server_close_connection)
forwarder.setServerCloseCallback(logger.on_server2client_close_connection)
#Switch on the specific TCP, UDP, SSL Server
if args.tcp:
#Start TCP Proxy Server
forwarder.tcpToTcp(args.listen_address, args.local_port, remote_server["server"], remote_server["port"])
print("TCP {}:{} -> {}:{}".format(args.listen_address, args.local_port, remote_server["server"], remote_server["port"]))
elif args.udp:
#Start USP Proxy Server
forwarder.udpToUDP(args.listen_address, args.local_port, remote_server["server"], remote_server["port"])
print("UDP {}:{} -> {}:{}".format(args.listen_address, args.local_port, remote_server["server"], remote_server["port"]))
else:
# If using SSL Proxy a CA Certificate is Required
if not args.cafile:
raise Exception("CA File Required")
#Get CA File to sign new Certificates
CA = CertificateAuthority(args.cafile)
#Get get the default endpoint Certificate
ssl_certs = CA.clone_certificate(remote_server)
#Create a Custom SSL Context to clone Certificates and resign them
serverContextFactory = SNISSLServerContext(CA, remote_server["server"], remote_server["port"], ssl_certs["keyfile"], ssl_certs["certfile"])
forwarder.sslToSSL(args.listen_address, args.local_port, remote_server["server"], remote_server["port"], CA, serverContextFactory)
print("TCP[SSL] {}:{} -> {}:{}".format(args.listen_address, args.local_port, remote_server["server"], remote_server["port"]))
reactor.run()