-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMiniMonkey.py
148 lines (115 loc) · 3.43 KB
/
MiniMonkey.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import struct
from threading import Thread, Lock
from queue import Queue
import socket
protocol = '<BH'
max_size = 3 + 2**16
ERROR = 0x0
AUTH = 0x1
ENTER = 0x2
PUBLISH = 0x3
SUBSCRIBE = 0x4
ADD_ADMIN = 0x10
REVOKE_ADMIN = 0x11
ADD_PUBLISH = 0x12
REVOKE_PUBLISH = 0x13
ADD_SUBSCRIBE = 0x14
REVOKE_SUBSCRIBE = 0x15
ADD_LOGIN = 0x16
REVOKE_LOGIN = 0x17
LINK_ROOM = 0x30
UNLINK_ROOM = 0x31
class MiniMonkey(Thread):
def __init__(self, host='localhost'):
Thread.__init__(self)
self.host = host
self.sock = None
self.inbuffer = b''
self.incoming = Queue()
self.outgoing = Queue()
self.in_mutex = Lock()
self.out_mutex = Lock()
self.should_run = True
def auth(self, token):
self.send(AUTH, token)
def enter(self, room):
self.send(ENTER, room)
def publish(self, payload):
self.send(PUBLISH, payload)
def subscribe(self, tag):
self.send(SUBSCRIBE, tag)
def add_admin(self, token):
self.send(ADD_ADMIN, token)
def revoke_admin(self, token):
self.send(REVOKE_ADMIN, token)
def add_publish(self, token):
self.send(ADD_PUBLISH, token)
def revoke_publish(self, token):
self.send(REVOKE_PUBLISH, token)
def add_subscribe(self, token):
self.send(ADD_SUBSCRIBE, token)
def revoke_subscribe(self, token):
self.send(REVOKE_SUBSCRIBE, token)
def add_login(self, token):
self.send(ADD_LOGIN, token)
def revoke_login(self, token):
self.send(REVOKE_LOGIN, token)
def link_room(self, room):
self.send(LINK_ROOM, room)
def unlink_room(self, room):
self.send(UNLINK_ROOM, room)
def send(self, code, payload):
self.out_mutex.acquire()
try:
self.outgoing.put((code, payload))
finally:
self.out_mutex.release()
def recv(self):
code = None
payload = None
self.in_mutex.acquire()
try:
if not self.incoming.empty():
code, payload = self.incoming.get()
finally:
self.in_mutex.release()
return code, payload
def _send(self):
self.out_mutex.acquire()
try:
if self.outgoing.empty():
pass
else:
code, payload = self.outgoing.get()
header = struct.pack(protocol, code, len(payload))
self.sock.send(header)
self.sock.send(payload.encode())
finally:
self.out_mutex.release()
def _recv(self):
try:
new_data = self.sock.recv(max_size)
except socket.timeout:
new_data = b''
self.inbuffer += new_data
if len(self.inbuffer) < 3:
return
header = self.inbuffer[0:3]
payload = self.inbuffer[3:]
(code, size) = struct.unpack(protocol, header)
if size <= len(payload):
self.in_mutex.acquire()
try:
self.incoming.put((code, payload[0:size]))
self.inbuffer = payload[size:]
finally:
self.in_mutex.release()
def stop(self):
self.should_run = False
def run(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, 1773))
self.sock.settimeout(0.01)
while self.should_run:
self._send()
self._recv()