Implement most of Connection

This commit is contained in:
Joscha 2019-04-06 17:17:10 +00:00
parent 325af11fea
commit 23425090cc
3 changed files with 245 additions and 3 deletions

View file

@ -1,3 +1,14 @@
import asyncio
import logging
from typing import Any, Awaitable, Callable
import websockets
from .events import Events
from .exceptions import *
logger = logging.getLogger(__name__)
__all__ = ["Connection"] __all__ = ["Connection"]
class Connection: class Connection:
@ -19,7 +30,7 @@ class Connection:
2. call connect() 2. call connect()
3. send and receive packets, reconnecting automatically when connection is 3. send and receive packets, reconnecting automatically when connection is
lost lost
4. call disconnect(), return to 2. 4. call disconnect()
IN PHASE 1, parameters such as the url the Connection should connect to are IN PHASE 1, parameters such as the url the Connection should connect to are
@ -62,6 +73,197 @@ class Connection:
"on_part-event" and "on_ping". "on_part-event" and "on_ping".
""" """
def __init__(self, PING_TIMEOUT = 60 # seconds
url: str):
_NOT_RUNNING = "not running"
_CONNECTING = "connecting"
_RUNNING = "running"
_RECONNECTING = "reconnecting"
_DISCONNECTING = "disconnecting"
# Initialising
def __init__(self, url: str) -> None:
self._url = url self._url = url
self._events = Events()
# This is the current status of the connection. It can be set to one of
# _NOT_RUNNING, _CONNECTING, _RUNNING, _RECONNECTING, or
# _DISCONNECTING.
#
# Always be careful to set any state-dependent variables.
self._status = _NOT_RUNNING
self._connected_event = asyncio.Event()
self._disconnected_event = asyncio.Event()
self._event_loop = None
# These must always be (re)set together. If one of them is None, all
# must be None.
self._ws = None
self._awaiting_replies = None
self._ping_check = None
def register_event(self,
event: str,
callback: Callable[..., Awaitable[None]]
) -> None:
self._events.register(event, callback)
# Connecting and disconnecting
async def _disconnect(self) -> None:
"""
Disconnect _ws and clean up _ws, _awaiting_replies and _ping_check.
Important: The caller must ensure that this function is called in valid
circumstances and not called twice at the same time. _disconnect() does
not check or manipulate _state.
"""
if self._ws is None:
# This indicates that _ws, _awaiting_replies and _ping_check are
# cleaned up
return
await self._ws.close()
for tasks in self._awaiting_replies.values():
for task in tasks:
task.cancel()
self._ping_check.cancel()
self._ws = None
self._awaiting_replies = None
self._ping_check = None
async def _connect(self) -> bool:
"""
Attempts once to create a ws connection.
Important: The caller must ensure that this function is called in valid
circumstances and not called twice at the same time. _connect() does
not check or manipulate _state, nor does it perform cleanup on
_awaiting_replies or _ping_check.
"""
try:
ws = await websockets.connect(self._url)
self._ws = ws
self._awaiting_replies = {}
self._ping_check = asyncio.create_task(
self._disconnect_in(self.PING_TIMEOUT))
return True
# TODO list all of the ways that creating a connection can go wrong
except websockets.InvalidStatusCode:
return False
async def _disconnect_in(self, delay):
await asyncio.sleep(delay)
await self._disconnect()
async def connect(self) -> bool:
# Special exception message for _CONNECTING.
if self._state == self._CONNECTING:
raise IncorrectStateException(("connect() may not be called"
" multiple times."))
if self._state != self._NOT_RUNNING:
raise IncorrectStateException(("disconnect() must complete before"
" connect() may be called again."))
# Now we're sure we're in the _NOT_RUNNING state, we can set our state.
# Important: No await-ing has occurred between checking the state and
# setting it.
self._state = self._CONNECTING
if await self._connect():
self._event_loop = asyncio.create_task(self._run())
self._state = self._RUNNING
return True
else:
self._state = self._NOT_RUNNING
return False
async def _reconnect(self) -> bool:
"""
This function should only be called from the event loop while the
_state is _RUNNING.
"""
if self._state != self._RUNNING:
raise IncorrectStateException()
self._state = self._RECONNECTING
await self._disconnect()
success = await self._connect()
self._state = self._RUNNING
return success
async def disconnect(self) -> None:
# This function is kinda complex. The comments make it harder to read,
# but hopefully easier to understand.
# Possible states left: _NOT_RUNNING, _CONNECTING, _RUNNING,
# _RECONNECTING, _DISCONNECTING
# Waiting until the current connection attempt is finished.
if self._state in [self._CONNECTING, self._RECONNECTING]:
# After _CONNECTING, the state can either be _NOT_RUNNING or
# _RUNNING. After _RECONNECTING, the state must be _RUNNING.
await self._connected_event.wait()
# The state is now either _NOT_RUNNING or _RUNNING.
# Possible states left: _NOT_RUNNING, _RUNNING, _DISCONNECTING
if self._state == self._NOT_RUNNING:
# No need to do anything since we're already disconnected
return
# Possible states left: _RUNNING, _DISCONNECTING
if self._state == self._DISCONNECTING:
# Wait until the disconnecting currently going on is complete. This
# is to prevent the disconnect() function from ever returning
# without the disconnecting process being finished.
await self._disconnected_event.wait()
return
# Possible states left: _RUNNING
# By principle of exclusion, the only state left is _RUNNING. Doing an
# explicit check though, just to make sure.
if self._state != self._RUNNING:
raise IncorrectStateException("This should never happen.")
# Now, we can properly disconnect ^^
await self._disconnect()
await self._event_loop
self._event_loop = None
self._state = self._NOT_RUNNING
# Notify all other disconnect()s waiting
self._disconnected_event.set()
self._disconnected_event.clear()
# Running
async def _run(self) -> None:
"""
The main loop that runs during phase 3
"""
# TODO
async def send(self, packet: Any) -> Any:
pass # TODO

25
yaboli/events.py Normal file
View file

@ -0,0 +1,25 @@
import asyncio
import logging
from typing import Any, Awaitable, Callable, Dict, List
logger = logging.getLogger(__name__)
__all__ = ["Events"]
class Events:
def __init__(self) -> None:
self._callbacks: Dict[str, List[Callable[..., Awaitable[None]]]] = {}
def register(self,
event: str,
callback: Callable[..., Awaitable[None]]
) -> None:
callback_list = self._callbacks.get(event, [])
callback_list.append(callback)
self._callbacks[event] = callback_list
logger.debug(f"Registered callback for event {event!r}")
async def fire(self, event: str, *args: Any, **kwargs: Any) -> None:
logger.debug(f"Calling callbacks for event {event!r}")
for callback in self._callbacks.get(event, []):
asyncio.create_task(callback(*args, **kwargs))

View file

@ -5,6 +5,21 @@ __all__ = ["EuphException", "JoinException", "CouldNotConnectException",
class EuphException(Exception): class EuphException(Exception):
pass pass
# Connection stuff
class IncorrectStateException(EuphException):
"""
A Connection function was called while the Connection was in the incorrect
state.
"""
pass
class ConnectionClosedException(EuphException):
"""
The connection was closed unexpectedly.
"""
pass
# Joining a room # Joining a room
class JoinException(EuphException): class JoinException(EuphException):