Add some event calls and event loop

This commit is contained in:
Joscha 2019-04-07 12:20:31 +00:00
parent a24e4aa18a
commit c60526a34d
2 changed files with 81 additions and 41 deletions

View file

@ -124,15 +124,18 @@ class Connection:
not check or manipulate _state.
"""
if self._ws is not None:
logger.debug("Closing ws connection")
await self._ws.close()
# Checking self._ws again since during the above await, another
# disconnect call could have finished cleaning up.
if self._ws is None:
# This indicates that _ws, _awaiting_replies and _ping_check are
# cleaned up
logger.debug("ws connection already cleaned up")
logger.debug("Ws connection already cleaned up")
return
logger.debug("Closing ws connection")
await self._ws.close()
logger.debug("Cancelling tasks awaiting replies")
for tasks in self._awaiting_replies.values():
for task in tasks:
@ -184,6 +187,32 @@ class Connection:
# while the _state is _RUNNING.
asyncio.create_task(self._disconnect())
async def _reconnect(self) -> bool:
"""
This function should only be called from the event loop while the
_state is _RUNNING.
"""
if self._state != self._RUNNING:
raise IncorrectStateException()
logger.debug("Reconnecting...")
self._events.fire("reconnecting")
self._state = self._RECONNECTING
await self._disconnect()
success = await self._connect()
self._state = self._RUNNING
self._events.fire("reconnected")
logger.debug("Sending connected notification")
async with self._connected_condition:
self._connected_condition.notify_all()
logger.debug("Reconnected" if success else "Reconnection failed")
return success
async def connect(self) -> bool:
# Special exception message for _CONNECTING.
if self._state == self._CONNECTING:
@ -194,7 +223,7 @@ class Connection:
raise IncorrectStateException(("disconnect() must complete before"
" connect() may be called again."))
logger.info("Connecting...")
logger.debug("Connecting...")
# Now we're sure we're in the _NOT_RUNNING state, we can set our state.
# Important: No await-ing has occurred between checking the state and
@ -207,6 +236,7 @@ class Connection:
logger.debug("Starting event loop")
self._event_loop = asyncio.create_task(self._run())
self._state = self._RUNNING
self._events.fire("connected")
else:
self._state = self._NOT_RUNNING
@ -217,47 +247,23 @@ class Connection:
logger.debug("Connected" if success else "Connection failed")
return success
async def _reconnect(self) -> bool:
"""
This function should only be called from the event loop while the
_state is _RUNNING.
"""
if self._state != self._RUNNING:
raise IncorrectStateException()
logger.info("Reconnecting...")
self._state = self._RECONNECTING
await self._disconnect()
success = await self._connect()
self._state = self._RUNNING
logger.debug("Sending connected notification")
async with self._connected_condition:
self._connected_condition.notify_all()
logger.debug("Reconnected" if success else "Reconnection failed")
return success
async def disconnect(self) -> None:
# Fun fact: This function consists of 24 lines of comments, 19 lines of
# Fun fact: This function consists of 25 lines of comments, 19 lines of
# code, 16 lines of whitespace and 7 lines of logging statements,
# making for a total of 66 lines. Its comments to code ratio is about
# 1.263.
# making for a total of 67 lines. Its comments to code ratio is about
# 1.316.
# Possible states left: _NOT_RUNNING, _CONNECTING, _RUNNING,
# _RECONNECTING, _DISCONNECTING
# Waiting until the current connection attempt is finished.
if self._state in [self._CONNECTING, self._RECONNECTING]:
# Waiting until the current connection attempt is finished. Using a
# while loop since the event loop might have started to reconnect again
# while the await is still waiting.
while self._state in [self._CONNECTING, self._RECONNECTING]:
# After _CONNECTING, the state can either be _NOT_RUNNING or
# _RUNNING. After _RECONNECTING, the state must be _RUNNING.
async with self._connected_condition:
await self._connected_condition.wait()
# The state is now either _NOT_RUNNING or _RUNNING.
# Possible states left: _NOT_RUNNING, _RUNNING, _DISCONNECTING
@ -286,7 +292,9 @@ class Connection:
if self._state != self._RUNNING:
raise IncorrectStateException("This should never happen.")
logger.info("Disconnecting...")
logger.debug("Disconnecting...")
self._events.fire("disconnecting")
# Now we're sure we're in the _RUNNING state, we can set our state.
# Important: No await-ing has occurred between checking the state and
@ -304,11 +312,11 @@ class Connection:
self._state = self._NOT_RUNNING
# Notify all other disconnect()s waiting
logger.debug("Send disconnected notification")
logger.debug("Sending disconnected notification")
async with self._disconnected_condition:
self._disconnected_condition.notify_all()
logger.info("Disconnected")
logger.debug("Disconnected")
# Running
@ -317,7 +325,39 @@ class Connection:
The main loop that runs during phase 3
"""
# TODO
while True:
if self._state != self._RUNNING:
logger.debug("Exiting event loop")
return
if self._ws is not None:
try:
logger.debug("Receiving ws packets")
async for packet in self._ws:
self._process_packet(packet)
except Exception as e: # TODO use proper exceptions
print(e)
logger.debug("Stopped receiving ws packets")
else:
logger.debug("No ws connection found")
if self._state != self._RUNNING:
logger.debug("Exiting event loop")
return
logger.debug("Attempting to reconnect")
while not await self._reconnect():
logger.debug("Reconnect attempt not successful")
if self._state != self._RUNNING:
logger.debug("Exiting event loop")
return
logger.debug(f"Sleeping for {self.RECONNECT_DELAY}s and retrying")
await asyncio.sleep(self._RECONNECT_DELAY)
def _process_packet(self, packet):
print(str(packet)[:50])
async def send(self, packet: Any) -> Any:
pass # TODO

View file

@ -19,7 +19,7 @@ class Events:
self._callbacks[event] = callback_list
logger.debug(f"Registered callback for event {event!r}")
async def fire(self, event: str, *args: Any, **kwargs: Any) -> None:
def fire(self, event: str, *args: Any, **kwargs: Any) -> None:
logger.debug(f"Calling callbacks for event {event!r}")
for callback in self._callbacks.get(event, []):
asyncio.create_task(callback(*args, **kwargs))