Skip to content

Commit

Permalink
voctocore.lib.controlserver: better on_loop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunsi committed May 19, 2024
1 parent a9bc3d4 commit 8447dd8
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions voctocore/lib/controlserver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from queue import Queue
from gi.repository import GObject
from queue import Empty, Queue
from threading import Lock

from gi.repository import GObject
from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
from lib.response import NotifyResponse
from lib.tcpmulticonnection import TCPMultiConnection

from vocto.port import Port

Expand All @@ -18,6 +19,9 @@ def __init__(self, pipeline):

self.command_queue = Queue()

self.on_loop_lock = Lock()
self.on_loop_active = False

self.commands = ControlServerCommands(pipeline)

def on_accepted(self, conn, addr):
Expand Down Expand Up @@ -59,8 +63,11 @@ def on_data(self, conn, _, leftovers, *args):
self.close_connection(conn)
return False

self.log.debug('re-starting on_loop scheduling')
GObject.idle_add(self.on_loop)
with self.on_loop_lock:
if not self.on_loop_active:
self.log.debug('re-starting on_loop scheduling')
GObject.idle_add(self.on_loop)
self.on_loop_active = True

self.command_queue.put((line, conn))

Expand All @@ -78,25 +85,23 @@ def on_loop(self):
'''Command handler. Processes commands in the command queue whenever
nothing else is happening (registered as GObject idle callback)'''

self.log.debug('on_loop called')

if self.command_queue.empty():
self.log.debug('command_queue is empty again, '
'stopping on_loop scheduling')
return False

line, requestor = self.command_queue.get()
with self.on_loop_lock:
try:
line, requestor = self.command_queue.get_nowait()
self.log.debug(f'on_loop {line=} {requestor=}')
except Empty:
self.log.debug('command_queue is empty again, stopping on_loop scheduling')
self.on_loop_active = False
return False

words = line.split()
if len(words) < 1:
self.log.debug('command_queue is empty again, '
'stopping on_loop scheduling')
self.log.debug(f'command_queue contained {line!r}, which is invalid, returning early')
return True

self.log.info("processing command '%s'", ' '.join(words))

command = words[0]
args = words[1:]
self.log.debug(f"on_loop {command=} {args=}")

response = None
try:
Expand All @@ -106,16 +111,15 @@ def on_loop(self):
raise KeyError()

command_function = self.commands.__class__.__dict__[command]

except KeyError as e:
self.log.info("Received unknown command %s", command)
response = "error unknown command %s\n" % command

else:
try:
responseObject = command_function(self.commands, *args)

except Exception as e:
self.log.error(f'{command}(*{args!r}) returned exception: {e!r}')
message = str(e) or "<no message>"
response = "error %s\n" % message

Expand All @@ -130,12 +134,12 @@ def on_loop(self):
self._schedule_write(conn, signal)
else:
response = "%s\n" % str(responseObject)

finally:
self.log.debug(f'on_loop {response=} {requestor=}')
if response is not None and requestor in self.currentConnections:
self._schedule_write(requestor, response)

return False
return True

def _schedule_write(self, conn, message):
queue = self.currentConnections[conn]
Expand All @@ -153,13 +157,13 @@ def on_write(self, conn, *args):
except KeyError:
return False

if queue.empty():
self.log.debug('write_queue[%u] is empty again, '
'stopping on_write scheduling',
conn.fileno())
try:
message = queue.get_nowait()
self.log.debug(f'on_write {message=}')
except Empty:
self.log.debug(f'write_queue[{conn.fileno()}] is empty again, stopping on_write scheduling')
return False

message = queue.get()
self.log.info("Responding message '%s'", message.strip())
try:
conn.send(message.encode())
Expand Down

0 comments on commit 8447dd8

Please sign in to comment.