Start the rewrite (again)

This commit is contained in:
Joscha 2017-09-01 22:12:11 +00:00
parent 9b0195aa72
commit 97b98c29f7
5 changed files with 351 additions and 801 deletions

View file

@ -1,229 +1,96 @@
import logging
logging.basicConfig(level=logging.DEBUG)
import asyncio
asyncio.get_event_loop().set_debug(True)
import json
import time
import threading
import websocket
from websocket import WebSocketException as WSException
import websockets
from websockets import ConnectionClosed
from . import callbacks
class Connection():
"""
Stays connected to a room in its own thread.
Callback functions are called when a packet is received.
Callbacks:
- all the message types from api.euphoria.io
These pass the packet data as argument to the called functions.
The other callbacks don't pass any special arguments.
- "connect"
- "disconnect"
- "stop"
"""
ROOM_FORMAT = "wss://euphoria.io/room/{}/ws"
def __init__(self, room, url_format=None):
"""
room - name of the room to connect to
class Connection:
def __init__(self, url, packet_hook, cookie=None):
self.url = url
self.cookie = cookie
self.packet_hook = packet_hook
"""
self.room = room
if not url_format:
url_format = self.ROOM_FORMAT
self._url = url_format.format(self.room)
self._stopping = False
stopped = False
self._ws = None
self._thread = None
self._send_id = 0
self._callbacks = callbacks.Callbacks()
self._id_callbacks = callbacks.Callbacks()
self._pid = 0
self._pending_responses = {}
def _connect(self, tries=-1, delay=10):
"""
_connect(tries, delay) -> bool
async def run(self):
self._ws = await websockets.connect(self.url)
tries - maximum number of retries
-1 -> retry indefinitely
Returns True on success, False on failure.
Connect to the room.
"""
while tries != 0:
try:
self._ws = websocket.create_connection(
self._url,
enable_multithread=True
)
self._callbacks.call("connect")
return True
except WSException:
if tries > 0:
tries -= 1
if tries != 0:
time.sleep(delay)
return False
def disconnect(self):
"""
disconnect() -> None
Reconnect to the room.
WARNING: To completely disconnect, use stop().
"""
if self._ws:
self._ws.close()
try:
while True:
response = await self._ws.recv()
asyncio.ensure_future(self._handle_json(response))
except websockets.ConnectionClosed:
pass
finally:
await self._ws.close() # just to make sure it's closed
self._ws = None
self._callbacks.call("disconnect")
stopped = True
for futures in self._pending_responses:
for future in futures:
future.set_error(ConnectionClosed)
future.cancel()
def launch(self):
"""
launch() -> Thread
async def stop(self):
if not stopped and self._ws:
await self._ws.close()
async def send(ptype, data=None, await_response=True):
if stopped:
raise ConnectionClosed
Connect to the room and spawn a new thread running run.
"""
pid = self._new_pid()
packet["type"] = ptype
packet["data"] = data
packet["id"] = pid
if self._connect(tries=1):
self._thread = threading.Thread(target=self._run,
name="{}-{}".format(self.room, int(time.time())))
self._thread.start()
return self._thread
if await_response:
wait_for = self._wait_for_response(pid)
await self._ws.send(json.dumps(packet))
await wait_for
return wait_for.result()
else:
self.stop()
await self._ws.send(json.dumps(packet))
def _run(self):
"""
_run() -> None
Receive messages.
"""
while not self._stopping:
try:
self._handle_json(self._ws.recv())
except (WSException, ConnectionResetError):
if not self._stopping:
self.disconnect()
self._connect()
def _new_pid(self):
self._pid += 1
return self._pid
def stop(self):
"""
stop() -> None
async def _handle_json(text):
packet = json.loads(text)
Close the connection to the room.
Joins the thread launched by self.launch().
"""
# Deal with pending responses
pid = packet.get("id")
for future in self._pending_responses.pop(pid, []):
future.set_result(packet)
self._stopping = True
self.disconnect()
self._callbacks.call("stop")
if self._thread and self._thread != threading.current_thread():
self._thread.join()
# Pass packet onto room
await self.packet_hook(packet)
def next_id(self):
"""
next_id() -> id
def _wait_for_response(pid):
future = asyncio.Future()
Returns the id that will be used for the next package.
"""
if pid not in self._pending_responses:
self._pending_responses[pid] = []
self._pending_responses[pid].append(future)
return str(self._send_id)
return future
def do_nothing(*args, **kwargs):
pass
def run():
conn = Connection("wss://echo.websocket.org", do_nothing)
loop = asyncio.get_event_loop()
#loop.call_later(3, conn.stop)
def add_callback(self, ptype, callback, *args, **kwargs):
"""
add_callback(ptype, callback, *args, **kwargs) -> None
Add a function to be called when a packet of type ptype is received.
"""
self._callbacks.add(ptype, callback, *args, **kwargs)
def add_id_callback(self, pid, callback, *args, **kwargs):
"""
add_id_callback(pid, callback, *args, **kwargs) -> None
Add a function to be called when a packet with id pid is received.
"""
self._id_callbacks.add(pid, callback, *args, **kwargs)
def add_next_callback(self, callback, *args, **kwargs):
"""
add_next_callback(callback, *args, **kwargs) -> None
Add a function to be called when the answer to the next message sent is received.
"""
self._id_callbacks.add(self.next_id(), callback, *args, **kwargs)
def _handle_json(self, data):
"""
handle_json(data) -> None
Handle incoming 'raw' data.
"""
packet = json.loads(data)
self._handle_packet(packet)
def _handle_packet(self, packet):
"""
_handle_packet(ptype, data) -> None
Handle incoming packets
"""
if "data" in packet:
data = packet["data"]
else:
data = None
if "error" in packet:
error = packet["error"]
else:
error = None
self._callbacks.call(packet["type"], data, error)
if "id" in packet:
self._id_callbacks.call(packet["id"], data, error)
self._id_callbacks.remove(packet["id"])
def _send_json(self, data):
"""
_send_json(data) -> None
Send 'raw' json.
"""
if self._ws:
try:
self._ws.send(json.dumps(data))
except WSException:
self.disconnect()
def send_packet(self, ptype, **kwargs):
"""
send_packet(ptype, **kwargs) -> None
Send a formatted packet.
"""
packet = {
"type": ptype,
"data": kwargs or None,
"id": str(self._send_id)
}
self._send_id += 1
self._send_json(packet)
loop.run_until_complete(asyncio.ensure_future(conn.run()))