Treat connection loss correctly

- detect timeout using websocket pings
- catch more errors in reconnect attempts
This commit is contained in:
Joscha 2017-09-06 10:02:02 +00:00
parent a6d4a0779f
commit 85bcdad916

View file

@ -6,6 +6,7 @@ asyncio.get_event_loop().set_debug(True)
import json
import websockets
import socket
#from websockets import ConnectionClosed
__all__ = ["Connection"]
@ -13,17 +14,20 @@ __all__ = ["Connection"]
class Connection:
def __init__(self, url, packet_hook, cookie=None):
def __init__(self, url, packet_hook, cookie=None, ping_timeout=10, ping_delay=30):
self.url = url
self.cookie = cookie
self.packet_hook = packet_hook
self.cookie = cookie
self.ping_timeout = ping_timeout # how long to wait for websocket ping reply
self.ping_delay = ping_delay # how long to wait between pings
self._ws = None
self._pid = 0 # successive packet ids
self._spawned_tasks = set()
self._pending_responses = {}
#self._stopping = False
self._runtask = None
self._pingtask = None # pings
async def connect(self, max_tries=10, delay=60):
"""
@ -37,17 +41,22 @@ class Connection:
await self.stop()
logger.debug(f"Stopped previously running things.")
tries_left = max_tries
while tries_left > 0:
logger.info(f"Attempting to connect, {tries_left} tries left.")
tries_left -= 1
try:
self._ws = await websockets.connect(self.url, max_size=None)
except (websockets.InvalidURI, websockets.InvalidHandshake):
except (websockets.InvalidURI, websockets.InvalidHandshake, socket.gaierror):
self._ws = None
if tries_left > 0:
await asyncio.sleep(delay)
else:
self._runtask = asyncio.ensure_future(self._run())
self._pingtask = asyncio.ensure_future(self._ping())
logger.debug(f"return self._runtask")
return self._runtask
async def _run(self):
@ -65,8 +74,29 @@ class Connection:
self._clean_up_tasks()
await self._ws.close() # just to make sure
await self._pingtask # should stop now that the ws is closed
self._ws = None
async def _ping(self):
"""
Periodically ping the server to detect a timeout.
"""
while True:
try:
logger.debug("Pinging...")
wait_for_reply = await self._ws.ping()
await asyncio.wait_for(wait_for_reply, self.ping_timeout)
logger.debug("Pinged!")
except asyncio.TimeoutError:
logger.warning("Ping timed out.")
await self._ws.close2()
break
except websockets.ConnectionClosed:
break
else:
await asyncio.sleep(self.ping_delay)
async def stop(self):
"""
Close websocket connection and wait for running task to stop.