Skip to content

Commit

Permalink
more tests fixes for trio
Browse files Browse the repository at this point in the history
  • Loading branch information
tshirtman committed Jan 19, 2022
1 parent 29154b9 commit eee1cda
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
run: pip install -U setuptools wheel
- name: install
run: pip install .[dev,ci]
- name: install async requirements
if: matrix.python != '2.7'
run: pip install trio curio
- name: test
run: python -m pytest --reruns 5 tests/ --cov oscpy/ --cov-branch
- name: coveralls
Expand Down
14 changes: 12 additions & 2 deletions oscpy/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from time import time
from functools import partial
import socket
from select import select

from oscpy import __version__
from oscpy.parser import read_packet, UNICODE
Expand Down Expand Up @@ -263,13 +264,22 @@ def close(self, sock=None):
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

if sock == self.default_socket:
self.default_socket = None

if sock not in self.sockets:
return

self.sockets.remove(sock)
read = select([sock], [], [], 0)
if platform != 'win32' and sock.family == socket.AF_UNIX:
print(sock.getsockname())
os.unlink(sock.getsockname())
else:
sock.close()

if sock == self.default_socket:
self.default_socket = None
if sock in read:
sock.recvfrom(UDP_MAX_SIZE)

def getaddress(self, sock=None):
"""Wrap call to getsockname.
Expand Down
20 changes: 20 additions & 0 deletions oscpy/server/curio_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from typing import Awaitable
from sys import platform
import os

from curio import TaskGroup, socket
from oscpy.server import OSCBaseServer, UDP_MAX_SIZE
Expand Down Expand Up @@ -61,6 +63,24 @@ async def process(self):
for s in self.sockets:
await g.spawn(self._listen, s)

async def close(self, sock=None):
"""Close a socket opened by the server."""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

if sock not in self.sockets:
logger.warning("Ignoring requested to close an unknown socket %s" % sock)

if sock == self.default_socket:
self.default_socket = None

if platform != 'win32' and sock.family == socket.AF_UNIX:
os.unlink(sock.getsockname())
else:
await sock.close()

async def stop_all(self):
await self.tasks_group.cancel_remaining()

Expand Down
8 changes: 1 addition & 7 deletions oscpy/server/thread_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ def stop(self, s=None):
s = self.default_socket

if s in self.sockets:
read = select([s], [], [], 0)
s.close()
if s in read:
s.recvfrom(UDP_MAX_SIZE)
self.sockets.remove(s)
if s is self.default_socket:
self.default_socket = None
self.close(s)
else:
raise RuntimeError('{} is not one of my sockets!'.format(s))

Expand Down
85 changes: 67 additions & 18 deletions oscpy/server/trio_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import logging
from functools import partial
from sys import platform
from typing import Awaitable

from trio import socket, open_nursery
from trio import socket, open_nursery, move_on_after
from oscpy.server import OSCBaseServer, UDP_MAX_SIZE

logging.basicConfig()
Expand Down Expand Up @@ -33,24 +36,34 @@ async def listen(
"Unknown socket family, accepted values are 'unix' and 'inet'"
)

sock = await self.get_socket(family_, (address, port))
if family == 'unix':
addr = address
else:
addr = (address, port)
sock = await self.get_socket(family_, addr)
self.add_socket(sock, default)
return sock

async def _listen(self, sock):
async with open_nursery() as nursery:
self.nurseries[sock] = nursery
while True:
data, addr = await sock.recvfrom(UDP_MAX_SIZE)
nursery.start_soon(
partial(
self.handle_message,
data,
addr,
drop_late=False,
sender_socket=sock
try:
while True:
data, addr = await sock.recvfrom(UDP_MAX_SIZE)
nursery.start_soon(
partial(
self.handle_message,
data,
addr,
drop_late=False,
sender_socket=sock
)
)
)
finally:
with move_on_after(1) as cleanup_scope:
cleanup_scope.shield = True
logger.info("socket %s cancelled", sock)
await self.stop(sock)

async def handle_message(self, data, sender, drop_late, sender_socket):
for callbacks, values, address in self.callbacks(data, sender, sender_socket):
Expand All @@ -60,13 +73,17 @@ async def _execute_callbacks(self, callbacks_list, address, values):
for cb, get_address in callbacks_list:
try:
if get_address:
await cb(address, *values)
result = cb(address, *values)
else:
await cb(*values)
result = cb(*values)
if isinstance(result, Awaitable):
await result

except Exception:
if self.intercept_errors:
logger.error("Unhandled exception caught in oscpy server", exc_info=True)
logger.error("Ignoring unhandled exception caught in oscpy server", exc_info=True)
else:
logger.exception("Unhandled exception caught in oscpy server")
raise

async def process(self):
Expand All @@ -80,9 +97,41 @@ async def stop_all(self):
"""
self.nursery.cancel_scope.deadline = 0

async def stop(self, sock):
nursery = self.nurseries.pop(sock)
nursery.cancel_scope.deadline = 0
async def stop(self, sock=None):
if sock is None:
if self.default_socket:
sock = self.default_socket
else:
raise RuntimeError('no default socket yet and no socket provided')
if sock in self.sockets:
self.sockets.remove(sock)
else:
raise RuntimeError("Socket %s is not managed by this server" % sock)
sock.close()
if sock in self.nurseries:
nursery = self.nurseries.pop(sock)
nursery.cancel_scope.deadline = 0

if sock is self.default_socket:
self.default_socket = None

async def close(self, sock=None):
"""Close a socket opened by the server."""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')

if sock not in self.sockets:
logger.warning("Ignoring requested to close an unknown socket %s" % sock)

if sock == self.default_socket:
self.default_socket = None

if platform != 'win32' and sock.family == socket.AF_UNIX:
os.unlink(sock.getsockname())
else:
sock.close()

def getaddress(self, sock=None):
"""Wrap call to getsockname.
Expand Down
Empty file added tests/__init__.py
Empty file.
Loading

0 comments on commit eee1cda

Please sign in to comment.