forked from Bitmessage/PyBitmessage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueues.py
61 lines (50 loc) · 1.81 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Most of the queues used by bitmessage threads are defined here."""
import threading
import time
from six.moves import queue
try:
from multiqueue import MultiQueue
except ImportError:
try:
from .multiqueue import MultiQueue
except ImportError:
import sys
if 'bitmessagemock' not in sys.modules:
raise
MultiQueue = queue.Queue
class ObjectProcessorQueue(queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`"""
maxSize = 32000000
def __init__(self):
queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
#: in Bytes. We maintain this to prevent nodes from flooding us
#: with objects which take up too much memory. If this gets
#: too big we'll sleep before asking for further objects.
self.curSize = 0
def put(self, item, block=True, timeout=None):
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
queue.Queue.put(self, item, block, timeout)
def get(self, block=True, timeout=None):
item = queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item
workerQueue = queue.Queue()
UISignalQueue = queue.Queue()
addressGeneratorQueue = queue.Queue()
#: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue()
addrQueue = MultiQueue()
portCheckerQueue = queue.Queue()
receiveDataQueue = queue.Queue()
#: The address generator thread uses this queue to get information back
#: to the API thread.
apiAddressGeneratorReturnQueue = queue.Queue()
#: for exceptions
excQueue = queue.Queue()