diff --git a/examples/send.py b/examples/send.py index 8ae486e..218ab8d 100644 --- a/examples/send.py +++ b/examples/send.py @@ -19,9 +19,9 @@ async def handle_data(self, data): event = data await self.put_queue(event) - async def run(self, number_of_iterations=-1): + async def run(self): """Run the loop for processing or generating pre-CoT data.""" - while 1: + while True: data = tak_pong() await self.handle_data(data) await asyncio.sleep(20) diff --git a/examples/send_receive.py b/examples/send_receive.py index 93b2d2b..3a697f8 100644 --- a/examples/send_receive.py +++ b/examples/send_receive.py @@ -44,9 +44,9 @@ async def handle_data(self, data): event = data await self.put_queue(event) - async def run(self, number_of_iterations=-1): + async def run(self): """Run the loop for processing or generating pre-CoT data.""" - while 1: + while True: data = gen_cot() self._logger.info("Sending:\n%s\n", data.decode()) await self.handle_data(data) @@ -60,9 +60,9 @@ async def handle_data(self, data): """Handle data from the receive queue.""" self._logger.info("Received:\n%s\n", data.decode()) - async def run(self): # pylint: disable=arguments-differ + async def run(self): """Read from the receive queue, put data onto handler.""" - while 1: + while True: data = ( await self.queue.get() ) # this is how we get the received CoT from rx_queue diff --git a/pytak/__init__.py b/pytak/__init__.py index eff8eb3..e7db029 100644 --- a/pytak/__init__.py +++ b/pytak/__init__.py @@ -36,7 +36,6 @@ DEFAULT_TLS_PARAMS_REQ, DEFAULT_HOST_ID, BOOLEAN_TRUTH, - DEFAULT_MIN_ASYNC_SLEEP, DEFAULT_XML_DECLARATION, DEFAULT_IMPORT_OTHER_CONFIGS, DEFAULT_TAK_PROTO, diff --git a/pytak/classes.py b/pytak/classes.py index de9c151..32742c8 100644 --- a/pytak/classes.py +++ b/pytak/classes.py @@ -71,8 +71,6 @@ def __init__( for handler in self._logger.handlers: handler.setLevel(logging.DEBUG) - self.min_period = pytak.DEFAULT_MIN_ASYNC_SLEEP - tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO) if tak_proto_version > 0 and takproto is None: @@ -104,31 +102,18 @@ async def handle_data(self, data: bytes) -> None: """Handle data (placeholder method, please override).""" pass - async def run(self, number_of_iterations=-1): - """Run this Thread, reads Data from Queue & passes data to next Handler.""" - self._logger.info("Run: %s", self.__class__.__name__) - - # We're instantiating the while loop this way, and using get_nowait(), - # to allow unit testing of at least one call of this loop. - while number_of_iterations != 0: - if self.queue.qsize() == 0: - await asyncio.sleep(self.min_period) - continue - - # self._logger.debug("TX queue size=%s", self.queue.qsize()) - data = None - try: - data = self.queue.get_nowait() - except (asyncio.QueueEmpty, _queue.Empty): - continue - - if not data: - continue + async def run_once(self) -> None: + """Reads Data from Queue & passes data to next Handler.""" + data = await self.queue.get() + await self.handle_data(data) + await self.fts_compat() - await self.handle_data(data) - await self.fts_compat() - - number_of_iterations -= 1 + async def run(self) -> None: + """Run this Thread - calls run_once() in a loop.""" + self._logger.info("Run: %s", self.__class__.__name__) + while True: + await self.run_once() + await asyncio.sleep(0) # make sure other tasks have a chance to run class TXWorker(Worker): @@ -233,18 +218,20 @@ async def readcot(self): except asyncio.IncompleteReadError: return None - async def run(self, number_of_iterations=-1) -> None: + async def run_once(self) -> None: + """Run this worker once.""" + if self.reader: + data: bytes = await self.readcot() + if data: + self._logger.debug("RX: %s", data) + self.queue.put_nowait(data) + + async def run(self) -> None: """Run this worker.""" self._logger.info("Run: %s", self.__class__.__name__) - - while 1: - await asyncio.sleep(self.min_period) - if self.reader: - data: bytes = await self.readcot() - if data: - self._logger.debug("RX: %s", data) - self.queue.put_nowait(data) - + while True: + await self.run_once() + await asyncio.sleep(0) # make sure other tasks have a chance to run class QueueWorker(Worker): """Read non-CoT Messages from an async network client. diff --git a/pytak/constants.py b/pytak/constants.py index 82de2a1..9134fbc 100644 --- a/pytak/constants.py +++ b/pytak/constants.py @@ -82,11 +82,6 @@ BOOLEAN_TRUTH: list = ["true", "yes", "y", "on", "1"] DEFAULT_COT_VAL: str = "9999999.0" -# await asyncio.sleep(0) should allow co-routines to yield, but they end up -# eating 100% CPU. @PeterQFR found bumping this to 0.1 solved the high CPU -# issue. See: https://github.com/snstac/pytak/pull/22 -DEFAULT_MIN_ASYNC_SLEEP: float = 0.1 - # TAK Protocol to use for CoT output, one of: 0 (XML, default), 1 (Mesh/Stream). # Doesn't always work with iTAK. Recommend sticking with 0 (XML). DEFAULT_TAK_PROTO: str = "0" diff --git a/tests/test_classes.py b/tests/test_classes.py index 246d90a..08cf57f 100644 --- a/tests/test_classes.py +++ b/tests/test_classes.py @@ -84,7 +84,7 @@ async def test_worker(): await event_queue.put("taco3") worker: pytak.Worker = pytak.Worker(event_queue) worker.handle_data = lambda data: event_queue.put(data) - await worker.run(1) + await worker.run_once() event = await event_queue.get() assert "taco2" == event @@ -106,7 +106,7 @@ async def test_eventworker() -> None: worker: pytak.Worker = pytak.TXWorker(event_queue, {}, writer) - await worker.run(1) + await worker.run_once() remaining_event = await event_queue.get() assert b"taco2" == remaining_event diff --git a/tests/test_pytak.py b/tests/test_pytak.py index 1a86b70..d8e388a 100644 --- a/tests/test_pytak.py +++ b/tests/test_pytak.py @@ -60,6 +60,6 @@ async def test_EventWorker(my_queue, my_writer): test_data = b"test test" test_eventworker = pytak.TXWorker(my_queue, {}, my_writer) await my_queue.put(test_data) - await test_eventworker.run(number_of_iterations=1) + await test_eventworker.run_once() assert my_writer.events.pop() == test_data