Skip to content

Commit

Permalink
Merge pull request #85 from jakesprouse/main
Browse files Browse the repository at this point in the history
Remove asyncio sleep from Worker threads
  • Loading branch information
ampledata authored Jan 3, 2025
2 parents f7caf83 + 0a1fa3d commit acb4a39
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 51 deletions.
4 changes: 2 additions & 2 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = tak_pong()
await self.handle_data(data)
await asyncio.sleep(20)
Expand Down
8 changes: 4 additions & 4 deletions examples/send_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = gen_cot()
self._logger.info("Sending:\n%s\n", data.decode())
await self.handle_data(data)
Expand All @@ -60,9 +60,9 @@ async def handle_data(self, data):
"""Handle data from the receive queue."""
self._logger.info("Received:\n%s\n", data.decode())

async def run(self): # pylint: disable=arguments-differ
async def run(self):
"""Read from the receive queue, put data onto handler."""
while 1:
while True:
data = (
await self.queue.get()
) # this is how we get the received CoT from rx_queue
Expand Down
1 change: 0 additions & 1 deletion pytak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
DEFAULT_TLS_PARAMS_REQ,
DEFAULT_HOST_ID,
BOOLEAN_TRUTH,
DEFAULT_MIN_ASYNC_SLEEP,
DEFAULT_XML_DECLARATION,
DEFAULT_IMPORT_OTHER_CONFIGS,
DEFAULT_TAK_PROTO,
Expand Down
59 changes: 23 additions & 36 deletions pytak/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def __init__(
for handler in self._logger.handlers:
handler.setLevel(logging.DEBUG)

self.min_period = pytak.DEFAULT_MIN_ASYNC_SLEEP

tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO)

if tak_proto_version > 0 and takproto is None:
Expand Down Expand Up @@ -104,31 +102,18 @@ async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
pass

async def run(self, number_of_iterations=-1):
"""Run this Thread, reads Data from Queue & passes data to next Handler."""
self._logger.info("Run: %s", self.__class__.__name__)

# We're instantiating the while loop this way, and using get_nowait(),
# to allow unit testing of at least one call of this loop.
while number_of_iterations != 0:
if self.queue.qsize() == 0:
await asyncio.sleep(self.min_period)
continue

# self._logger.debug("TX queue size=%s", self.queue.qsize())
data = None
try:
data = self.queue.get_nowait()
except (asyncio.QueueEmpty, _queue.Empty):
continue

if not data:
continue
async def run_once(self) -> None:
"""Reads Data from Queue & passes data to next Handler."""
data = await self.queue.get()
await self.handle_data(data)
await self.fts_compat()

await self.handle_data(data)
await self.fts_compat()

number_of_iterations -= 1
async def run(self) -> None:
"""Run this Thread - calls run_once() in a loop."""
self._logger.info("Run: %s", self.__class__.__name__)
while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run


class TXWorker(Worker):
Expand Down Expand Up @@ -233,18 +218,20 @@ async def readcot(self):
except asyncio.IncompleteReadError:
return None

async def run(self, number_of_iterations=-1) -> None:
async def run_once(self) -> None:
"""Run this worker once."""
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

async def run(self) -> None:
"""Run this worker."""
self._logger.info("Run: %s", self.__class__.__name__)

while 1:
await asyncio.sleep(self.min_period)
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run

class QueueWorker(Worker):
"""Read non-CoT Messages from an async network client.
Expand Down
5 changes: 0 additions & 5 deletions pytak/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@
BOOLEAN_TRUTH: list = ["true", "yes", "y", "on", "1"]
DEFAULT_COT_VAL: str = "9999999.0"

# await asyncio.sleep(0) should allow co-routines to yield, but they end up
# eating 100% CPU. @PeterQFR found bumping this to 0.1 solved the high CPU
# issue. See: https://github.com/snstac/pytak/pull/22
DEFAULT_MIN_ASYNC_SLEEP: float = 0.1

# TAK Protocol to use for CoT output, one of: 0 (XML, default), 1 (Mesh/Stream).
# Doesn't always work with iTAK. Recommend sticking with 0 (XML).
DEFAULT_TAK_PROTO: str = "0"
Expand Down
4 changes: 2 additions & 2 deletions tests/test_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def test_worker():
await event_queue.put("taco3")
worker: pytak.Worker = pytak.Worker(event_queue)
worker.handle_data = lambda data: event_queue.put(data)
await worker.run(1)
await worker.run_once()
event = await event_queue.get()
assert "taco2" == event

Expand All @@ -106,7 +106,7 @@ async def test_eventworker() -> None:

worker: pytak.Worker = pytak.TXWorker(event_queue, {}, writer)

await worker.run(1)
await worker.run_once()
remaining_event = await event_queue.get()
assert b"taco2" == remaining_event

Expand Down
2 changes: 1 addition & 1 deletion tests/test_pytak.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ async def test_EventWorker(my_queue, my_writer):
test_data = b"test test"
test_eventworker = pytak.TXWorker(my_queue, {}, my_writer)
await my_queue.put(test_data)
await test_eventworker.run(number_of_iterations=1)
await test_eventworker.run_once()

assert my_writer.events.pop() == test_data

0 comments on commit acb4a39

Please sign in to comment.