Skip to content

Commit

Permalink
more tests fixes for trio
Browse files Browse the repository at this point in the history
  • Loading branch information
tshirtman committed Jan 17, 2022
1 parent 29154b9 commit 58802fa
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 62 deletions.
85 changes: 67 additions & 18 deletions oscpy/server/trio_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import logging
from functools import partial
from sys import platform
from typing import Awaitable

from trio import socket, open_nursery
from trio import socket, open_nursery, move_on_after
from oscpy.server import OSCBaseServer, UDP_MAX_SIZE

logging.basicConfig()
Expand Down Expand Up @@ -33,24 +36,34 @@ async def listen(
"Unknown socket family, accepted values are 'unix' and 'inet'"
)

sock = await self.get_socket(family_, (address, port))
if family == 'unix':
addr = address
else:
addr = (address, port)
sock = await self.get_socket(family_, addr)
self.add_socket(sock, default)
return sock

async def _listen(self, sock):
async with open_nursery() as nursery:
self.nurseries[sock] = nursery
while True:
data, addr = await sock.recvfrom(UDP_MAX_SIZE)
nursery.start_soon(
partial(
self.handle_message,
data,
addr,
drop_late=False,
sender_socket=sock
try:
while True:
data, addr = await sock.recvfrom(UDP_MAX_SIZE)
nursery.start_soon(
partial(
self.handle_message,
data,
addr,
drop_late=False,
sender_socket=sock
)
)
)
finally:
with move_on_after(1) as cleanup_scope:
cleanup_scope.shield = True
logger.info("socket %s cancelled", sock)
await self.stop(sock)

async def handle_message(self, data, sender, drop_late, sender_socket):
for callbacks, values, address in self.callbacks(data, sender, sender_socket):
Expand All @@ -60,13 +73,17 @@ async def _execute_callbacks(self, callbacks_list, address, values):
for cb, get_address in callbacks_list:
try:
if get_address:
await cb(address, *values)
result = cb(address, *values)
else:
await cb(*values)
result = cb(*values)
if isinstance(result, Awaitable):
await result

except Exception:
if self.intercept_errors:
logger.error("Unhandled exception caught in oscpy server", exc_info=True)
logger.error("Ignoring unhandled exception caught in oscpy server", exc_info=True)
else:
logger.exception("Unhandled exception caught in oscpy server")
raise

async def process(self):
Expand All @@ -80,9 +97,41 @@ async def stop_all(self):
"""
self.nursery.cancel_scope.deadline = 0

async def stop(self, sock):
nursery = self.nurseries.pop(sock)
nursery.cancel_scope.deadline = 0
async def stop(self, sock=None):
if sock is None:
if self.default_socket:
sock = self.default_socket
else:
raise RuntimeError('no default socket yet and no socket provided')
if sock in self.sockets:
self.sockets.remove(sock)
else:
raise RuntimeError("Socket %s is not managed by this server" % sock)
sock.close()
if sock in self.nurseries:
nursery = self.nurseries.pop(sock)
nursery.cancel_scope.deadline = 0

if sock is self.default_socket:
self.default_socket = None

async def close(self, sock=None):
"""Close a socket opened by the server."""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

if sock not in self.sockets:
logger.warning("Ignoring requested to close an unknown socket %s" % sock)

if sock == self.default_socket:
self.default_socket = None

if platform != 'win32' and sock.family == socket.AF_UNIX:
os.unlink(sock.getsockname())
else:
sock.close()

def getaddress(self, sock=None):
"""Wrap call to getsockname.
Expand Down
Loading

0 comments on commit 58802fa

Please sign in to comment.