Skip to content

Commit

Permalink
remote: replace autoping with explicit polling
Browse files Browse the repository at this point in the history
This fixes autobahn timeouts during long-running tests.

The mail problem is that the asyncio eventloop of the remote client is
not running while executing test cases. That causes the autoping
requests from the crossbar to time out, which triggers a disconnect.

We can't easily use autoping only for the exporters, so this implements
explicit polling from the coordinator to each exporter. It uses a new
'version' procedure for that, which is also useful for debugging.

The long-term fix is probably to move the autobahn client event loop
into a python thread, but that is a much larger change with higher risk.

Signed-off-by: Jan Luebbe <[email protected]>
  • Loading branch information
jluebbe authored and Emantor committed Nov 19, 2017
1 parent 80c45c6 commit e13ba3d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 13 deletions.
4 changes: 0 additions & 4 deletions .crossbar/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ workers:
ticket:
type: dynamic
authenticator: org.labgrid.authenticate
options:
auto_ping_interval: 1000
auto_ping_timeout: 2000
auto_ping_size: 4
components:
- type: class
classname: labgrid.remote.authenticator.AuthenticatorSession
Expand Down
65 changes: 61 additions & 4 deletions labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""The coordinator module coordinates exported resources and clients accessing them."""
# pylint: disable=no-member
import asyncio
import traceback
from collections import defaultdict
from os import environ
from pprint import pprint
Expand All @@ -27,6 +28,7 @@ class RemoteSession:
coordinator = attr.ib()
session = attr.ib()
authid = attr.ib()
version = attr.ib(default="unknown", init=False)

@property
def key(self):
Expand Down Expand Up @@ -96,6 +98,7 @@ class CoordinatorComponent(ApplicationSession):
def onConnect(self):
self.sessions = {}
self.places = {}
self.poll_task = None

yield from self.load()

Expand All @@ -117,7 +120,7 @@ def onJoin(self, details):
options=RegisterOptions(details_arg='details')
)

# resources
# resources
yield from self.register(
self.set_resource,
'org.labgrid.coordinator.set_resource',
Expand Down Expand Up @@ -163,8 +166,59 @@ def onJoin(self, details):
yield from self.register(
self.get_places, 'org.labgrid.coordinator.get_places'
)

self.poll_task = asyncio.get_event_loop().create_task(self.poll())

print("Coordinator ready.")

@asyncio.coroutine
def onLeave(self, details):
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
super().onLeave(details)

@asyncio.coroutine
def onDisconnect(self):
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up

@asyncio.coroutine
def _poll_step(self):
for session in list(self.sessions.values()):
if isinstance(session, ExporterSession):
fut = self.call(
'org.labgrid.exporter.{}.version'.format(session.name)
)
done, pending = yield from asyncio.wait([fut], timeout=5)
if not done:
print('kicking exporter ({}/{})'.format(session.key, session.name))
yield from self.on_session_leave(session.key)
continue
try:
session.version = done.pop().result()
except wamp.exception.ApplicationError as e:
if e.error == "wamp.error.no_such_procedure":
pass # old client
elif e.error == "wamp.error.canceled":
pass # disconnected
else:
raise

@asyncio.coroutine
def poll(self):
loop = asyncio.get_event_loop()
while not loop.is_closed():
try:
yield from asyncio.sleep(15.0)
yield from self._poll_step()
except asyncio.CancelledError:
break
except:
traceback.print_exc()

@asyncio.coroutine
def save(self):
with open('resources.yaml', 'w') as f:
Expand Down Expand Up @@ -241,7 +295,7 @@ def on_session_join(self, session_details):

@asyncio.coroutine
def on_session_leave(self, session_id):
pprint(session_id)
print('leave ({})'.format(session_id))
try:
session = self.sessions.pop(session_id)
except KeyError:
Expand All @@ -263,13 +317,16 @@ def attach(self, name, details=None):

@asyncio.coroutine
def set_resource(self, groupname, resourcename, resource, details=None):
session = self.sessions.get(details.caller)
if session is None:
return
assert isinstance(session, ExporterSession)

groupname = str(groupname)
resourcename = str(resourcename)
# TODO check if acquired
print(details)
pprint(resource)
session = self.sessions[details.caller]
assert isinstance(session, ExporterSession)
action, resource_path = session.set_resource(groupname, resourcename, resource)
if action is Action.ADD:
self._add_default_place(groupname)
Expand Down
23 changes: 18 additions & 5 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from .config import ResourceConfig
from .common import ResourceEntry, enable_tcp_nodelay

try:
import pkg_resources
__version__ = pkg_resources.get_distribution('labgrid').version
except:
__version__ = "unknown"

def get_free_port():
"""Helper function to always return an unused port."""
Expand Down Expand Up @@ -191,6 +196,7 @@ def onConnect(self):
self.name = self.config.extra['name']
self.authid = "exporter/{}".format(self.name)
self.address = self._transport.transport.get_extra_info('sockname')[0]
self.poll_task = None

self.groups = {}

Expand Down Expand Up @@ -232,22 +238,25 @@ def onJoin(self, details):
prefix = 'org.labgrid.exporter.{}'.format(self.name)
yield from self.register(self.acquire, '{}.acquire'.format(prefix))
yield from self.register(self.release, '{}.release'.format(prefix))
yield from self.register(self.version, '{}.version'.format(prefix))

@asyncio.coroutine
def onLeave(self, details):
"""Cleanup after leaving the coordinator connection"""
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
super().onLeave(details)

@asyncio.coroutine
def onDisconnect(self):
print("connection lost")
global reexec
reexec = True
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up
if self.poll_task:
self.poll_task.cancel()
yield from asyncio.wait([self.poll_task])
yield from asyncio.sleep(0.5) # give others a chance to clean up
self.loop.stop()

@asyncio.coroutine
Expand All @@ -262,6 +271,10 @@ def release(self, group_name, resource_name):
#resource.release()
yield from self.update_resource(group_name, resource_name)

@asyncio.coroutine
def version(self):
return __version__

@asyncio.coroutine
def _poll_step(self):
for group_name, group in self.groups.items():
Expand Down

0 comments on commit e13ba3d

Please sign in to comment.