diff --git a/yaboli/connection.py b/yaboli/connection.py index a92d0b8..be0589c 100644 --- a/yaboli/connection.py +++ b/yaboli/connection.py @@ -1,3 +1,14 @@ +import asyncio +import logging +from typing import Any, Awaitable, Callable + +import websockets + +from .events import Events +from .exceptions import * + +logger = logging.getLogger(__name__) + __all__ = ["Connection"] class Connection: @@ -19,7 +30,7 @@ class Connection: 2. call connect() 3. send and receive packets, reconnecting automatically when connection is lost - 4. call disconnect(), return to 2. + 4. call disconnect() IN PHASE 1, parameters such as the url the Connection should connect to are @@ -62,6 +73,197 @@ class Connection: "on_part-event" and "on_ping". """ - def __init__(self, - url: str): + PING_TIMEOUT = 60 # seconds + + _NOT_RUNNING = "not running" + _CONNECTING = "connecting" + _RUNNING = "running" + _RECONNECTING = "reconnecting" + _DISCONNECTING = "disconnecting" + + # Initialising + + def __init__(self, url: str) -> None: self._url = url + + self._events = Events() + + # This is the current status of the connection. It can be set to one of + # _NOT_RUNNING, _CONNECTING, _RUNNING, _RECONNECTING, or + # _DISCONNECTING. + # + # Always be careful to set any state-dependent variables. + self._status = _NOT_RUNNING + self._connected_event = asyncio.Event() + self._disconnected_event = asyncio.Event() + + self._event_loop = None + + # These must always be (re)set together. If one of them is None, all + # must be None. + self._ws = None + self._awaiting_replies = None + self._ping_check = None + + def register_event(self, + event: str, + callback: Callable[..., Awaitable[None]] + ) -> None: + self._events.register(event, callback) + + # Connecting and disconnecting + + async def _disconnect(self) -> None: + """ + Disconnect _ws and clean up _ws, _awaiting_replies and _ping_check. + + Important: The caller must ensure that this function is called in valid + circumstances and not called twice at the same time. _disconnect() does + not check or manipulate _state. + """ + + if self._ws is None: + # This indicates that _ws, _awaiting_replies and _ping_check are + # cleaned up + return + + await self._ws.close() + + for tasks in self._awaiting_replies.values(): + for task in tasks: + task.cancel() + + self._ping_check.cancel() + + self._ws = None + self._awaiting_replies = None + self._ping_check = None + + async def _connect(self) -> bool: + """ + Attempts once to create a ws connection. + + Important: The caller must ensure that this function is called in valid + circumstances and not called twice at the same time. _connect() does + not check or manipulate _state, nor does it perform cleanup on + _awaiting_replies or _ping_check. + """ + + try: + ws = await websockets.connect(self._url) + + self._ws = ws + self._awaiting_replies = {} + self._ping_check = asyncio.create_task( + self._disconnect_in(self.PING_TIMEOUT)) + + return True + + # TODO list all of the ways that creating a connection can go wrong + except websockets.InvalidStatusCode: + return False + + async def _disconnect_in(self, delay): + await asyncio.sleep(delay) + await self._disconnect() + + async def connect(self) -> bool: + # Special exception message for _CONNECTING. + if self._state == self._CONNECTING: + raise IncorrectStateException(("connect() may not be called" + " multiple times.")) + + if self._state != self._NOT_RUNNING: + raise IncorrectStateException(("disconnect() must complete before" + " connect() may be called again.")) + + # Now we're sure we're in the _NOT_RUNNING state, we can set our state. + # Important: No await-ing has occurred between checking the state and + # setting it. + self._state = self._CONNECTING + + if await self._connect(): + self._event_loop = asyncio.create_task(self._run()) + self._state = self._RUNNING + return True + else: + self._state = self._NOT_RUNNING + return False + + async def _reconnect(self) -> bool: + """ + This function should only be called from the event loop while the + _state is _RUNNING. + """ + + if self._state != self._RUNNING: + raise IncorrectStateException() + + self._state = self._RECONNECTING + + await self._disconnect() + success = await self._connect() + + self._state = self._RUNNING + + return success + + async def disconnect(self) -> None: + # This function is kinda complex. The comments make it harder to read, + # but hopefully easier to understand. + + # Possible states left: _NOT_RUNNING, _CONNECTING, _RUNNING, + # _RECONNECTING, _DISCONNECTING + + # Waiting until the current connection attempt is finished. + if self._state in [self._CONNECTING, self._RECONNECTING]: + # After _CONNECTING, the state can either be _NOT_RUNNING or + # _RUNNING. After _RECONNECTING, the state must be _RUNNING. + await self._connected_event.wait() + # The state is now either _NOT_RUNNING or _RUNNING. + + # Possible states left: _NOT_RUNNING, _RUNNING, _DISCONNECTING + + if self._state == self._NOT_RUNNING: + # No need to do anything since we're already disconnected + return + + # Possible states left: _RUNNING, _DISCONNECTING + + if self._state == self._DISCONNECTING: + # Wait until the disconnecting currently going on is complete. This + # is to prevent the disconnect() function from ever returning + # without the disconnecting process being finished. + await self._disconnected_event.wait() + return + + # Possible states left: _RUNNING + + # By principle of exclusion, the only state left is _RUNNING. Doing an + # explicit check though, just to make sure. + if self._state != self._RUNNING: + raise IncorrectStateException("This should never happen.") + + # Now, we can properly disconnect ^^ + await self._disconnect() + + await self._event_loop + self._event_loop = None + + self._state = self._NOT_RUNNING + + # Notify all other disconnect()s waiting + self._disconnected_event.set() + self._disconnected_event.clear() + + # Running + + async def _run(self) -> None: + """ + The main loop that runs during phase 3 + """ + + # TODO + + async def send(self, packet: Any) -> Any: + pass # TODO diff --git a/yaboli/events.py b/yaboli/events.py new file mode 100644 index 0000000..4fce41f --- /dev/null +++ b/yaboli/events.py @@ -0,0 +1,25 @@ +import asyncio +import logging +from typing import Any, Awaitable, Callable, Dict, List + +logger = logging.getLogger(__name__) + +__all__ = ["Events"] + +class Events: + def __init__(self) -> None: + self._callbacks: Dict[str, List[Callable[..., Awaitable[None]]]] = {} + + def register(self, + event: str, + callback: Callable[..., Awaitable[None]] + ) -> None: + callback_list = self._callbacks.get(event, []) + callback_list.append(callback) + self._callbacks[event] = callback_list + logger.debug(f"Registered callback for event {event!r}") + + async def fire(self, event: str, *args: Any, **kwargs: Any) -> None: + logger.debug(f"Calling callbacks for event {event!r}") + for callback in self._callbacks.get(event, []): + asyncio.create_task(callback(*args, **kwargs)) diff --git a/yaboli/exceptions.py b/yaboli/exceptions.py index 2c951a0..63ffe77 100644 --- a/yaboli/exceptions.py +++ b/yaboli/exceptions.py @@ -5,6 +5,21 @@ __all__ = ["EuphException", "JoinException", "CouldNotConnectException", class EuphException(Exception): pass +# Connection stuff + +class IncorrectStateException(EuphException): + """ + A Connection function was called while the Connection was in the incorrect + state. + """ + pass + +class ConnectionClosedException(EuphException): + """ + The connection was closed unexpectedly. + """ + pass + # Joining a room class JoinException(EuphException):