Rework spawning and task structure
Still not working: See TestBot.py
This commit is contained in:
parent
34e1ae4b8f
commit
1c3b9d0a20
5 changed files with 252 additions and 93 deletions
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import asyncio
|
||||
asyncio.get_event_loop().set_debug(True)
|
||||
|
||||
|
|
@ -17,37 +18,67 @@ class Connection:
|
|||
self.cookie = cookie
|
||||
self.packet_hook = packet_hook
|
||||
|
||||
self.stopped = False
|
||||
|
||||
self._ws = None
|
||||
self._pid = 0
|
||||
self._pid = 0 # successive packet ids
|
||||
self._spawned_tasks = set()
|
||||
self._pending_responses = {}
|
||||
#self._stopping = False
|
||||
self._runtask = None
|
||||
|
||||
async def run(self):
|
||||
self._ws = await websockets.connect(self.url, max_size=None)
|
||||
async def connect(self, max_tries=10, delay=60):
|
||||
"""
|
||||
success = await connect(max_tries=10, delay=60)
|
||||
|
||||
Attempt to connect to a room.
|
||||
Returns the task listening for packets, or None if the attempt failed.
|
||||
"""
|
||||
|
||||
await self.stop()
|
||||
|
||||
tries_left = max_tries
|
||||
while tries_left > 0:
|
||||
tries_left -= 1
|
||||
try:
|
||||
self._ws = await websockets.connect(self.url, max_size=None)
|
||||
except (websockets.InvalidURI, websockets.InvalidHandshake):
|
||||
self._ws = None
|
||||
if tries_left > 0:
|
||||
await asyncio.sleep(delay)
|
||||
else:
|
||||
self._runtask = asyncio.ensure_future(self._run())
|
||||
return self._runtask
|
||||
|
||||
async def _run(self):
|
||||
"""
|
||||
Listen for packets and deal with them accordingly.
|
||||
"""
|
||||
|
||||
try:
|
||||
while True:
|
||||
response = await self._ws.recv()
|
||||
asyncio.ensure_future(self._handle_json(response))
|
||||
await self._handle_next_message()
|
||||
except websockets.ConnectionClosed:
|
||||
pass
|
||||
finally:
|
||||
await self._ws.close() # just to make sure it's closed
|
||||
self._ws = None
|
||||
stopped = True
|
||||
self._clean_up_futures()
|
||||
self._clean_up_tasks()
|
||||
|
||||
for future in self._pending_responses:
|
||||
#future.set_error(ConnectionClosed)
|
||||
future.cancel()
|
||||
await self._ws.close() # just to make sure
|
||||
self._ws = None
|
||||
|
||||
async def stop(self):
|
||||
if not self.stopped and self._ws:
|
||||
"""
|
||||
Close websocket connection and wait for running task to stop.
|
||||
"""
|
||||
|
||||
if self._ws:
|
||||
await self._ws.close()
|
||||
|
||||
if self._runtask:
|
||||
await self._runtask
|
||||
|
||||
async def send(self, ptype, data=None, await_response=True):
|
||||
if self.stopped:
|
||||
raise ConnectionClosed
|
||||
if not self._ws:
|
||||
raise asyncio.CancelledError
|
||||
|
||||
pid = str(self._new_pid())
|
||||
packet = {
|
||||
|
|
@ -60,7 +91,8 @@ class Connection:
|
|||
if await_response:
|
||||
wait_for = self._wait_for_response(pid)
|
||||
|
||||
await self._ws.send(json.dumps(packet, separators=(',', ':')))
|
||||
logging.debug(f"Currently used websocket at self._ws: {self._ws}")
|
||||
await self._ws.send(json.dumps(packet, separators=(',', ':'))) # minimum size
|
||||
|
||||
if await_response:
|
||||
await wait_for
|
||||
|
|
@ -70,11 +102,32 @@ class Connection:
|
|||
self._pid += 1
|
||||
return self._pid
|
||||
|
||||
async def _handle_next_message(self):
|
||||
response = await self._ws.recv()
|
||||
task = asyncio.ensure_future(self._handle_json(response))
|
||||
self._track_task(task) # will be cancelled when the connection is closed
|
||||
|
||||
def _clean_up_futures(self):
|
||||
for pid, future in self._pending_responses.items():
|
||||
logger.debug(f"Cancelling future: {future}")
|
||||
future.cancel()
|
||||
self._pending_responses = {}
|
||||
|
||||
def _clean_up_tasks(self):
|
||||
for task in self._spawned_tasks:
|
||||
if not task.done():
|
||||
logger.debug(f"Cancelling task: {task}")
|
||||
task.cancel()
|
||||
else:
|
||||
logger.debug(f"Task already done: {task}")
|
||||
logger.debug(f"Exception: {task.exception()}")
|
||||
self._spawned_tasks = set()
|
||||
|
||||
async def _handle_json(self, text):
|
||||
packet = json.loads(text)
|
||||
|
||||
# Deal with pending responses
|
||||
pid = packet.get("id")
|
||||
pid = packet.get("id", None)
|
||||
future = self._pending_responses.pop(pid, None)
|
||||
if future:
|
||||
future.set_result(packet)
|
||||
|
|
@ -82,6 +135,20 @@ class Connection:
|
|||
# Pass packet onto room
|
||||
await self.packet_hook(packet)
|
||||
|
||||
def _track_task(self, task):
|
||||
self._spawned_tasks.add(task)
|
||||
|
||||
# only keep running tasks
|
||||
#tasks = set()
|
||||
#for task in self._spawned_tasks:
|
||||
#if not task.done():
|
||||
#logger.debug(f"Keeping task: {task}")
|
||||
#tasks.add(task)
|
||||
#else:
|
||||
#logger.debug(f"Deleting task: {task}")
|
||||
#self._spawned_tasks = tasks
|
||||
#self._spawned_tasks = {task for task in self._spawned_tasks if not task.done()} # TODO: Reenable
|
||||
|
||||
def _wait_for_response(self, pid):
|
||||
future = asyncio.Future()
|
||||
self._pending_responses[pid] = future
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue