diff --git a/yaboli/connection.py b/yaboli/connection.py index cf79283..eaf5c54 100644 --- a/yaboli/connection.py +++ b/yaboli/connection.py @@ -95,6 +95,7 @@ class Connection: self._url = url self._events = Events() + self._packet_id = 0 # This is the current status of the connection. It can be set to one of # _NOT_RUNNING, _CONNECTING, _RUNNING, _RECONNECTING, or @@ -110,8 +111,7 @@ class Connection: # These must always be (re)set together. If one of them is None, all # must be None. self._ws = None - self._awaiting_replies: Optional[Dict[str, Callable[..., - Awaitable[None]]]] = None + self._awaiting_replies: Optional[Dict[str, asyncio.Future[Any]]] = None self._ping_check: Optional[asyncio.Task[None]] = None def register_event(self, @@ -149,10 +149,9 @@ class Connection: logger.debug("Ws connection already cleaned up") return - logger.debug("Cancelling tasks awaiting replies") - for tasks in self._awaiting_replies.values(): - for task in tasks: - task.cancel() + logger.debug("Cancelling futures waiting for replies") + for future in self._awaiting_replies.values(): + future.set_exception(ConnectionClosedException()) logger.debug("Cancelling ping check task") self._ping_check.cancel() @@ -403,6 +402,7 @@ class Connection: try: logger.debug("Receiving ws packets") async for packet in self._ws: + logger.debug(f"Received packet {packet}") packet_data = json.loads(packet) self._process_packet(packet_data) except websockets.ConnectionClosed: @@ -426,7 +426,41 @@ class Connection: await asyncio.sleep(self.RECONNECT_DELAY) def _process_packet(self, packet: Any) -> None: - print(str(packet)[:100]) # TODO implement + # This function assumes that the packet is formed correctly according + # to http://api.euphoria.io/#packets. + + # First, notify whoever's waiting for this packet + packet_id = packet.get("id", None) + if packet_id is not None and self._awaiting_replies is not None: + future = self._awaiting_replies.get(packet_id, None) + if future is not None: + future.set_result(packet) + + # Then, send the corresponding event + packet_type = packet["type"] + self._events.fire(f"on_{packet_type}", packet) + + # Finally, if it's a ping command, reply as per + # http://api.euphoria.io/#ping + if packet_type == "ping-event": + logger.debug("Pong!") + asyncio.create_task(self._send_if_possible( + "ping-reply", + {"time": packet["data"]["time"]} + )) + + async def _send_if_possible(self, packet_type: str, data: Any,) -> None: + """ + This function tries to send a packet without awaiting the reply. + + It ignores IncorrectStateExceptions, meaning that if it is called while + in the wrong state, nothing will happen. + """ + + try: + await self.send(packet_type, data, await_reply=False) + except IncorrectStateException: + logger.debug("Could not send (disconnecting or already disconnected)") async def send(self, packet_type: str, @@ -451,4 +485,43 @@ class Connection: If the connection closes unexpectedly while sending the packet or waiting for the reply, a ConnectionClosedException will be thrown. """ - pass # TODO + + while self._state in [self._CONNECTING, self._RECONNECTING]: + async with self._connected_condition: + await self._connected_condition.wait() + + if self._state != self._RUNNING: + raise IncorrectStateException(("send() must be called while the" + " Connection is running")) + + # We're now definitely in the _RUNNING state + + # Since we're in the _RUNNING state, _ws and _awaiting_replies are not + # None. This check is to satisfy mypy. + if self._ws is None or self._awaiting_replies is None: + raise IncorrectStateException("This should never happen") + + packet_id = str(self._packet_id) + self._packet_id += 1 + + # Doing this before the await below since we know that + # _awaiting_replies is not None while the _state is _RUNNING. + if await_reply: + response: asyncio.Future[Any] = asyncio.Future() + self._awaiting_replies[packet_id] = response + + text = json.dumps({"id": packet_id, "type": packet_type, "data": data}) + logger.debug(f"Sending packet {text}") + try: + await self._ws.send(text) + except websockets.ConnectionClosed: + raise ConnectionClosedException() # as promised in the docstring + + if await_reply: + await response + # If the response Future was completed with a + # ConnectionClosedException via set_exception(), response.result() + # will re-raise that exception. + return response.result() + else: + return None