From 2d2b1d25b1973b28d7d63f8d35a342599635dc28 Mon Sep 17 00:00:00 2001 From: Greg Albrecht Date: Tue, 26 Jul 2022 11:57:09 -0700 Subject: [PATCH] RXWorker will now actually read from the socket! --- pytak/classes.py | 27 +++++++++++++++++---------- setup.py | 2 +- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pytak/classes.py b/pytak/classes.py index d2665c6..7d08546 100644 --- a/pytak/classes.py +++ b/pytak/classes.py @@ -77,7 +77,7 @@ async def run(self, number_of_iterations=-1): """ Runs this Thread, reads Data from Queue & passes data to next Handler. """ - self._logger.info("Running %s", self.__class__) + self._logger.info("Run: %s", self.__class__) # We're instantiating the while loop this way, and using get_nowait(), # to allow unit testing of at least one call of this loop. while number_of_iterations != 0: @@ -110,7 +110,7 @@ async def handle_data(self, data: bytes) -> None: COT Event Handler, accepts COT Events from the COT Event Queue and processes them for writing. """ - self._logger.debug("Handling data='%s'", data) + self._logger.debug("TX: %s", data) await self.send_data(data) async def send_data(self, data: bytes) -> None: @@ -141,12 +141,19 @@ def __init__( super().__init__(queue, config) self.reader: asyncio.Protocol = reader + async def readcot(self): + return await self.reader.readuntil("".encode("UTF-8")) + async def run(self, number_of_iterations=-1) -> None: - self._logger.info("Running %s", self.__class__) + self._logger.info("Run: %s", self.__class__) while 1: - data: bytes = await self.queue.get() - self._logger.debug("data='%s'", data) + if self.reader: + data: bytes = await self.readcot() + self._logger.debug("RX: %s", data) + self.queue.put_nowait(data) + else: + await asyncio.sleep(0.01) class QueueWorker(Worker): # pylint: disable=too-few-public-methods @@ -164,7 +171,7 @@ class QueueWorker(Worker): # pylint: disable=too-few-public-methods def __init__(self, queue: asyncio.Queue, config: dict) -> None: super().__init__(queue, config) - self._logger.info("Using COT Dest.: %s", self.config.get("COT_URL")) + self._logger.info("COT Dest: %s", self.config.get("COT_URL")) async def put_queue(self, data: bytes) -> None: """Puts Data onto the Queue.""" @@ -215,7 +222,7 @@ async def hello_event(self): def add_task(self, task): """Adds the given task to our coroutine task list.""" - self._logger.debug("Adding Task: %s", task) + self._logger.debug("Add: %s", task) self.tasks.add(task) def add_tasks(self, tasks): @@ -225,7 +232,7 @@ def add_tasks(self, tasks): def run_task(self, task): """Runs the given coroutine task.""" - self._logger.debug("Running Task: %s", task) + self._logger.debug("Run: %s", task) self.running_tasks.add(asyncio.ensure_future(task.run())) def run_tasks(self, tasks=None): @@ -236,7 +243,7 @@ def run_tasks(self, tasks=None): async def run(self): """Runs this Thread and its associated coroutine tasks.""" - self._logger.info("Running %s", self.__class__) + self._logger.info("Run: %s", self.__class__) await self.hello_event() self.run_tasks() @@ -246,4 +253,4 @@ async def run(self): ) for task in done: - self._logger.info("Completed Task: %s", task) + self._logger.info("Complete: %s", task) diff --git a/setup.py b/setup.py index 5ad7bbb..2cd56b6 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ import setuptools __title__ = "pytak" -__version__ = "5.0.4" +__version__ = "5.1.0" __author__ = "Greg Albrecht W2GMD " __copyright__ = "Copyright 2022 Greg Albrecht" __license__ = "Apache License, Version 2.0"