diff --git a/yaboli/connection.py b/yaboli/connection.py index b0a108d..4a61f1b 100644 --- a/yaboli/connection.py +++ b/yaboli/connection.py @@ -6,6 +6,7 @@ asyncio.get_event_loop().set_debug(True) import json import websockets +import socket #from websockets import ConnectionClosed __all__ = ["Connection"] @@ -13,17 +14,20 @@ __all__ = ["Connection"] class Connection: - def __init__(self, url, packet_hook, cookie=None): + def __init__(self, url, packet_hook, cookie=None, ping_timeout=10, ping_delay=30): self.url = url - self.cookie = cookie self.packet_hook = packet_hook + self.cookie = cookie + self.ping_timeout = ping_timeout # how long to wait for websocket ping reply + self.ping_delay = ping_delay # how long to wait between pings self._ws = None self._pid = 0 # successive packet ids self._spawned_tasks = set() self._pending_responses = {} - #self._stopping = False + self._runtask = None + self._pingtask = None # pings async def connect(self, max_tries=10, delay=60): """ @@ -37,17 +41,22 @@ class Connection: await self.stop() + logger.debug(f"Stopped previously running things.") + tries_left = max_tries while tries_left > 0: + logger.info(f"Attempting to connect, {tries_left} tries left.") tries_left -= 1 try: self._ws = await websockets.connect(self.url, max_size=None) - except (websockets.InvalidURI, websockets.InvalidHandshake): + except (websockets.InvalidURI, websockets.InvalidHandshake, socket.gaierror): self._ws = None if tries_left > 0: await asyncio.sleep(delay) else: self._runtask = asyncio.ensure_future(self._run()) + self._pingtask = asyncio.ensure_future(self._ping()) + logger.debug(f"return self._runtask") return self._runtask async def _run(self): @@ -65,8 +74,29 @@ class Connection: self._clean_up_tasks() await self._ws.close() # just to make sure + await self._pingtask # should stop now that the ws is closed self._ws = None + async def _ping(self): + """ + Periodically ping the server to detect a timeout. + """ + + while True: + try: + logger.debug("Pinging...") + wait_for_reply = await self._ws.ping() + await asyncio.wait_for(wait_for_reply, self.ping_timeout) + logger.debug("Pinged!") + except asyncio.TimeoutError: + logger.warning("Ping timed out.") + await self._ws.close2() + break + except websockets.ConnectionClosed: + break + else: + await asyncio.sleep(self.ping_delay) + async def stop(self): """ Close websocket connection and wait for running task to stop.