Send and receive packets
This commit is contained in:
parent
2de2cbf92c
commit
2a9cd03c47
1 changed files with 81 additions and 8 deletions
|
|
@ -95,6 +95,7 @@ class Connection:
|
|||
self._url = url
|
||||
|
||||
self._events = Events()
|
||||
self._packet_id = 0
|
||||
|
||||
# This is the current status of the connection. It can be set to one of
|
||||
# _NOT_RUNNING, _CONNECTING, _RUNNING, _RECONNECTING, or
|
||||
|
|
@ -110,8 +111,7 @@ class Connection:
|
|||
# These must always be (re)set together. If one of them is None, all
|
||||
# must be None.
|
||||
self._ws = None
|
||||
self._awaiting_replies: Optional[Dict[str, Callable[...,
|
||||
Awaitable[None]]]] = None
|
||||
self._awaiting_replies: Optional[Dict[str, asyncio.Future[Any]]] = None
|
||||
self._ping_check: Optional[asyncio.Task[None]] = None
|
||||
|
||||
def register_event(self,
|
||||
|
|
@ -149,10 +149,9 @@ class Connection:
|
|||
logger.debug("Ws connection already cleaned up")
|
||||
return
|
||||
|
||||
logger.debug("Cancelling tasks awaiting replies")
|
||||
for tasks in self._awaiting_replies.values():
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
logger.debug("Cancelling futures waiting for replies")
|
||||
for future in self._awaiting_replies.values():
|
||||
future.set_exception(ConnectionClosedException())
|
||||
|
||||
logger.debug("Cancelling ping check task")
|
||||
self._ping_check.cancel()
|
||||
|
|
@ -403,6 +402,7 @@ class Connection:
|
|||
try:
|
||||
logger.debug("Receiving ws packets")
|
||||
async for packet in self._ws:
|
||||
logger.debug(f"Received packet {packet}")
|
||||
packet_data = json.loads(packet)
|
||||
self._process_packet(packet_data)
|
||||
except websockets.ConnectionClosed:
|
||||
|
|
@ -426,7 +426,41 @@ class Connection:
|
|||
await asyncio.sleep(self.RECONNECT_DELAY)
|
||||
|
||||
def _process_packet(self, packet: Any) -> None:
|
||||
print(str(packet)[:100]) # TODO implement
|
||||
# This function assumes that the packet is formed correctly according
|
||||
# to http://api.euphoria.io/#packets.
|
||||
|
||||
# First, notify whoever's waiting for this packet
|
||||
packet_id = packet.get("id", None)
|
||||
if packet_id is not None and self._awaiting_replies is not None:
|
||||
future = self._awaiting_replies.get(packet_id, None)
|
||||
if future is not None:
|
||||
future.set_result(packet)
|
||||
|
||||
# Then, send the corresponding event
|
||||
packet_type = packet["type"]
|
||||
self._events.fire(f"on_{packet_type}", packet)
|
||||
|
||||
# Finally, if it's a ping command, reply as per
|
||||
# http://api.euphoria.io/#ping
|
||||
if packet_type == "ping-event":
|
||||
logger.debug("Pong!")
|
||||
asyncio.create_task(self._send_if_possible(
|
||||
"ping-reply",
|
||||
{"time": packet["data"]["time"]}
|
||||
))
|
||||
|
||||
async def _send_if_possible(self, packet_type: str, data: Any,) -> None:
|
||||
"""
|
||||
This function tries to send a packet without awaiting the reply.
|
||||
|
||||
It ignores IncorrectStateExceptions, meaning that if it is called while
|
||||
in the wrong state, nothing will happen.
|
||||
"""
|
||||
|
||||
try:
|
||||
await self.send(packet_type, data, await_reply=False)
|
||||
except IncorrectStateException:
|
||||
logger.debug("Could not send (disconnecting or already disconnected)")
|
||||
|
||||
async def send(self,
|
||||
packet_type: str,
|
||||
|
|
@ -451,4 +485,43 @@ class Connection:
|
|||
If the connection closes unexpectedly while sending the packet or
|
||||
waiting for the reply, a ConnectionClosedException will be thrown.
|
||||
"""
|
||||
pass # TODO
|
||||
|
||||
while self._state in [self._CONNECTING, self._RECONNECTING]:
|
||||
async with self._connected_condition:
|
||||
await self._connected_condition.wait()
|
||||
|
||||
if self._state != self._RUNNING:
|
||||
raise IncorrectStateException(("send() must be called while the"
|
||||
" Connection is running"))
|
||||
|
||||
# We're now definitely in the _RUNNING state
|
||||
|
||||
# Since we're in the _RUNNING state, _ws and _awaiting_replies are not
|
||||
# None. This check is to satisfy mypy.
|
||||
if self._ws is None or self._awaiting_replies is None:
|
||||
raise IncorrectStateException("This should never happen")
|
||||
|
||||
packet_id = str(self._packet_id)
|
||||
self._packet_id += 1
|
||||
|
||||
# Doing this before the await below since we know that
|
||||
# _awaiting_replies is not None while the _state is _RUNNING.
|
||||
if await_reply:
|
||||
response: asyncio.Future[Any] = asyncio.Future()
|
||||
self._awaiting_replies[packet_id] = response
|
||||
|
||||
text = json.dumps({"id": packet_id, "type": packet_type, "data": data})
|
||||
logger.debug(f"Sending packet {text}")
|
||||
try:
|
||||
await self._ws.send(text)
|
||||
except websockets.ConnectionClosed:
|
||||
raise ConnectionClosedException() # as promised in the docstring
|
||||
|
||||
if await_reply:
|
||||
await response
|
||||
# If the response Future was completed with a
|
||||
# ConnectionClosedException via set_exception(), response.result()
|
||||
# will re-raise that exception.
|
||||
return response.result()
|
||||
else:
|
||||
return None
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue