Skip to content

Commit

Permalink
Remove asyncio.sleep(self.min_period) from Worker threads
Browse files Browse the repository at this point in the history
The 0.1s sleep was introduced in [#22](#22)
to reduce CPU usage due to the `RXWorker` busy looping. However, this
sleep constrains PyTAK from receiving more than 10 messages per seconds.
Decreasing the sleep period increases CPU usage again.

This commit removes the unneeded sleep from both `RXWorker.run()` and
`Worker.run().`
* `RXWorker.run()` was already sleeping in `self.reader.readuntil()` or
  `self.reader.recv()`.
* In `TXWorker.run()`, we replace `self.queue.get_nowait()` with
  `self.queue.get()`. We address the reason for using `get_nowait()`
  by introducing a new method for testing purposes,
  `TXWorker.run_once()`, which `TXWorker.run() calls in a loop.
  • Loading branch information
jakesprouse committed Nov 21, 2024
1 parent d57bd8a commit 0a1fa3d
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 51 deletions.
4 changes: 2 additions & 2 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = tak_pong()
await self.handle_data(data)
await asyncio.sleep(20)
Expand Down
8 changes: 4 additions & 4 deletions examples/send_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async def handle_data(self, data):
event = data
await self.put_queue(event)

async def run(self, number_of_iterations=-1):
async def run(self):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
while True:
data = gen_cot()
self._logger.info("Sending:\n%s\n", data.decode())
await self.handle_data(data)
Expand All @@ -60,9 +60,9 @@ async def handle_data(self, data):
"""Handle data from the receive queue."""
self._logger.info("Received:\n%s\n", data.decode())

async def run(self): # pylint: disable=arguments-differ
async def run(self):
"""Read from the receive queue, put data onto handler."""
while 1:
while True:
data = (
await self.queue.get()
) # this is how we get the received CoT from rx_queue
Expand Down
1 change: 0 additions & 1 deletion pytak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
DEFAULT_TLS_PARAMS_REQ,
DEFAULT_HOST_ID,
BOOLEAN_TRUTH,
DEFAULT_MIN_ASYNC_SLEEP,
DEFAULT_XML_DECLARATION,
DEFAULT_IMPORT_OTHER_CONFIGS,
DEFAULT_TAK_PROTO,
Expand Down
59 changes: 23 additions & 36 deletions pytak/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def __init__(
for handler in self._logger.handlers:
handler.setLevel(logging.DEBUG)

self.min_period = pytak.DEFAULT_MIN_ASYNC_SLEEP

tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO)

if tak_proto_version > 0 and takproto is None:
Expand Down Expand Up @@ -104,31 +102,18 @@ async def handle_data(self, data: bytes) -> None:
"""Handle data (placeholder method, please override)."""
pass

async def run(self, number_of_iterations=-1):
"""Run this Thread, reads Data from Queue & passes data to next Handler."""
self._logger.info("Run: %s", self.__class__.__name__)

# We're instantiating the while loop this way, and using get_nowait(),
# to allow unit testing of at least one call of this loop.
while number_of_iterations != 0:
if self.queue.qsize() == 0:
await asyncio.sleep(self.min_period)
continue

# self._logger.debug("TX queue size=%s", self.queue.qsize())
data = None
try:
data = self.queue.get_nowait()
except (asyncio.QueueEmpty, _queue.Empty):
continue

if not data:
continue
async def run_once(self) -> None:
"""Reads Data from Queue & passes data to next Handler."""
data = await self.queue.get()
await self.handle_data(data)
await self.fts_compat()

await self.handle_data(data)
await self.fts_compat()

number_of_iterations -= 1
async def run(self) -> None:
"""Run this Thread - calls run_once() in a loop."""
self._logger.info("Run: %s", self.__class__.__name__)
while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run


class TXWorker(Worker):
Expand Down Expand Up @@ -233,18 +218,20 @@ async def readcot(self):
except asyncio.IncompleteReadError:
return None

async def run(self, number_of_iterations=-1) -> None:
async def run_once(self) -> None:
"""Run this worker once."""
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

async def run(self) -> None:
"""Run this worker."""
self._logger.info("Run: %s", self.__class__.__name__)

while 1:
await asyncio.sleep(self.min_period)
if self.reader:
data: bytes = await self.readcot()
if data:
self._logger.debug("RX: %s", data)
self.queue.put_nowait(data)

while True:
await self.run_once()
await asyncio.sleep(0) # make sure other tasks have a chance to run

class QueueWorker(Worker):
"""Read non-CoT Messages from an async network client.
Expand Down
5 changes: 0 additions & 5 deletions pytak/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@
BOOLEAN_TRUTH: list = ["true", "yes", "y", "on", "1"]
DEFAULT_COT_VAL: str = "9999999.0"

# await asyncio.sleep(0) should allow co-routines to yield, but they end up
# eating 100% CPU. @PeterQFR found bumping this to 0.1 solved the high CPU
# issue. See: https://github.com/snstac/pytak/pull/22
DEFAULT_MIN_ASYNC_SLEEP: float = 0.1

# TAK Protocol to use for CoT output, one of: 0 (XML, default), 1 (Mesh/Stream).
# Doesn't always work with iTAK. Recommend sticking with 0 (XML).
DEFAULT_TAK_PROTO: str = "0"
Expand Down
4 changes: 2 additions & 2 deletions tests/test_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async def test_worker():
await event_queue.put("taco3")
worker: pytak.Worker = pytak.Worker(event_queue)
worker.handle_data = lambda data: event_queue.put(data)
await worker.run(1)
await worker.run_once()
event = await event_queue.get()
assert "taco2" == event

Expand All @@ -106,7 +106,7 @@ async def test_eventworker() -> None:

worker: pytak.Worker = pytak.TXWorker(event_queue, {}, writer)

await worker.run(1)
await worker.run_once()
remaining_event = await event_queue.get()
assert b"taco2" == remaining_event

Expand Down
2 changes: 1 addition & 1 deletion tests/test_pytak.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ async def test_EventWorker(my_queue, my_writer):
test_data = b"test test"
test_eventworker = pytak.TXWorker(my_queue, {}, my_writer)
await my_queue.put(test_data)
await test_eventworker.run(number_of_iterations=1)
await test_eventworker.run_once()

assert my_writer.events.pop() == test_data

0 comments on commit 0a1fa3d

Please sign in to comment.