From 320dd1688924d6459ec819798d584043fc3451ee Mon Sep 17 00:00:00 2001 From: Joscha Date: Thu, 13 Apr 2017 18:10:37 +0000 Subject: [PATCH] Add server This is a first try at connecting multiple clients using a server. The commit includes a lot of debugging messages. I will hopefully clean up the server and some of the client code. --- chunks.py | 25 ++++++++++--- client.py | 82 ++++++++++++++++++++++++++++++++++------ clientchunkpool.py | 43 ++++++++++++++++++++- dbchunkpool.py | 2 +- maps.py | 17 ++++++--- server.py | 93 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 236 insertions(+), 26 deletions(-) diff --git a/chunks.py b/chunks.py index 719cb50..c9a9a18 100644 --- a/chunks.py +++ b/chunks.py @@ -2,6 +2,8 @@ import threading import time from utils import CHUNK_WIDTH, CHUNK_HEIGHT, Position +import sys + class ChunkDiff(): """ Represents differences between two chunks (changes to be made to a chunk). @@ -13,10 +15,16 @@ class ChunkDiff(): def __init__(self): self._chars = {} + def __str__(self): + return "cd" + str(self._chars) + + def __repr__(self): + return "cd" + repr(self._chars) + @classmethod def from_dict(cls, d): diff = cls() - diff._chars = d + diff._chars = {int(i): v for i, v in d.items()} return diff #self._chars = d.copy() @@ -99,6 +107,9 @@ class Chunk(): def get_changes(self): return self._modifications + def as_diff(self): + return self._content.combine(self._modifications) + def touch(self, now=None): self.last_modified = now or time.time() @@ -111,7 +122,7 @@ class Chunk(): #y += 1 def lines(self): - return self._content.combine(self._modifications).lines() + return self.as_diff().lines() def modified(self): return not self._modifications.empty() @@ -150,14 +161,17 @@ class ChunkPool(): def apply_changes(self, changes): for change in changes: - pos = Position(change[0][0], change[0][1]) + #pos = Position(change[0][0], change[0][1]) + pos = change[0] diff = change[1] chunk = self.get(pos) if not chunk: chunk = self.create(pos) + sys.stderr.write(f"Previous at {pos}: {chunk._content}\n") chunk.commit_diff(diff) + sys.stderr.write(f"Afterwrd at {pos}: {chunk._content}\n") def commit_changes(self): changes = [] @@ -177,7 +191,8 @@ class ChunkPool(): def load_list(self, coords): for pos in coords: - self.load(pos) + if pos not in self._chunks: + self.load(pos) def unload(self, pos): if pos in self._chunks: @@ -188,7 +203,7 @@ class ChunkPool(): self.unload(pos) def clean_up(self, except_for=[], condition=lambda pos, chunk: True): - # old list comprehension which became too long: + ## old list comprehension which became too long: #coords = [pos for pos, chunk in self._chunks.items() if not pos in except_for and condition(chunk)] #self.save_changes() # needs to be accounted for by the user diff --git a/client.py b/client.py index 6236cba..95ea1c6 100644 --- a/client.py +++ b/client.py @@ -1,10 +1,15 @@ import curses +import json import os import string import sys import threading +import websocket +from websocket import WebSocketException as WSException + from maps import Map, ChunkMap from chunks import ChunkDiff +from utils import Position from clientchunkpool import ClientChunkPool class Client(): @@ -12,7 +17,7 @@ class Client(): self.stopping = False self.chunkmap_active = False - self.address = address + self.address = f"ws://{address}/" self._drawevent = threading.Event() self.pool = ClientChunkPool(self) #self.map_ = Map(sizex, sizey, self.pool) @@ -21,10 +26,28 @@ class Client(): #self.sock = socket.Socket(...) def launch(self, stdscr): + # connect to server + try: + self._ws = websocket.create_connection( + self.address, + enable_multithread=True + ) + except ConnectionRefusedError: + sys.stderr.write(f"Could not connect to server: {self.address!r}\n") + return + + # create map etc. sizey, sizex = stdscr.getmaxyx() self.map_ = Map(sizex, sizey, self.pool, self) self.chunkmap = ChunkMap(self.map_) + # start connection thread + self.connectionthread = threading.Thread( + target=self.connection_thread, + name="connectionthread" + ) + self.connectionthread.start() + # start input thread self.inputthread = threading.Thread( target=self.input_thread, @@ -34,6 +57,7 @@ class Client(): ) self.inputthread.start() + # update screen until stopped while not self.stopping: self._drawevent.wait() self._drawevent.clear() @@ -94,26 +118,60 @@ class Client(): else: sys.stderr.write(repr(i) + "\n") - def stop(self): - self.stopping = True - self.redraw() + def connection_thread(self): + while True: + try: + j = self._ws.recv() + self.handle_json(json.loads(j)) + except (WSException, ConnectionResetError, OSError): + #self.stop() + return + def handle_json(self, message): + sys.stderr.write(f"message: {message}\n") + if message["type"] == "apply-changes": + changes = [] + for chunk in message["data"]: + pos = Position(chunk[0][0], chunk[0][1]) + change = ChunkDiff.from_dict(chunk[1]) + changes.append((pos, change)) + + sys.stderr.write(f"Changes to apply: {changes}\n") + self.map_.apply_changes(changes) + + def stop(self): + sys.stderr.write("Stopping!\n") + self.stopping = True + self._ws.close() + self.redraw() + def request_chunks(self, coords): - def execute(): - changes = [(pos, ChunkDiff()) for pos in coords] - with self.pool as pool: - pool.apply_changes(changes) + #sys.stderr.write(f"requested chunks: {coords}\n") + message = {"type": "request-chunks", "data": coords} + self._ws.send(json.dumps(message)) - tx = threading.Timer(1, execute) - tx.start() + #def execute(): + #changes = [(pos, ChunkDiff()) for pos in coords] + #with self.pool as pool: + #pool.apply_changes(changes) + + #tx = threading.Timer(1, execute) + #tx.start() + + def unload_chunks(self, coords): + #sys.stderr.write(f"unloading chunks: {coords}\n") + message = {"type": "unload-chunks", "data": coords} + self._ws.send(json.dumps(message)) def send_changes(self, changes): - pass + #sys.stderr.write(f"sending changes: {changes}\n") + message = {"type": "save-changes", "data": changes} + self._ws.send(json.dumps(message)) def main(argv): if len(argv) != 2: print("Usage:") - print(" {} address".format(argv[0])) + print(f" {argv[0]} address") return os.environ.setdefault('ESCDELAY', '25') # only a 25 millisecond delay diff --git a/clientchunkpool.py b/clientchunkpool.py index 40732e7..54f12ca 100644 --- a/clientchunkpool.py +++ b/clientchunkpool.py @@ -1,5 +1,8 @@ +import threading from chunks import ChunkPool +import sys + class ClientChunkPool(ChunkPool): """ A ChunkPool that requests/loads chunks from a client. @@ -9,6 +12,10 @@ class ClientChunkPool(ChunkPool): super().__init__() self._client = client + self._save_thread = None + + def set(self, pos, chunk): + super().set(pos, chunk) #def commit_changes(self): #changes = [] @@ -24,12 +31,44 @@ class ClientChunkPool(ChunkPool): self._client.redraw() + def save_changes_delayed(self): + sys.stderr.write("Pre-HEHEHE\n") + if not self._save_thread: + def threadf(): + sys.stderr.write("HEHEHE\n") + self.save_changes() + self._save_thread = None + self._save_thread = threading.Timer(.25, threadf) + self._save_thread.start() + def save_changes(self): changes = self.commit_changes() - self._client.send_changes(changes) + dchanges = [] + for pos, change in changes: + dchange = change.to_dict() + if dchange: + dchanges.append((pos, dchange)) + if dchanges: + self._client.send_changes(dchanges) def load(self, pos): raise Exception def load_list(self, coords): - self._client.request_chunks(coords) + coords = [pos for pos in coords if pos not in self._chunks] + if coords: + self._client.request_chunks(coords) + + #def unload(self, pos): + #raise Exception + + def unload_list(self, coords): + if coords: + #self.save_changes() + self._client.unload_chunks(coords) + super().unload_list(coords) + +# What needs to happen differently from the default implementation: +# loading -> only ask server when necessary +# unloading -> send message to server +# unloading -> commit changes when anything is actually unloaded diff --git a/dbchunkpool.py b/dbchunkpool.py index 056e3ed..e318f2a 100644 --- a/dbchunkpool.py +++ b/dbchunkpool.py @@ -1,4 +1,4 @@ -from .chunks.py import ChunkPool +from chunks import ChunkPool class ChunkDB(): """ diff --git a/maps.py b/maps.py index 1a642de..c30b09c 100644 --- a/maps.py +++ b/maps.py @@ -16,10 +16,10 @@ class Map(): self.chunkpreload = 0 # preload chunks in this radius (they will count as "visible") self.chunkunload = 5 # don't unload chunks within this radius self.cursorpadding = 2 - self.worldx = 0 - self.worldy = 0 - self.cursorx = self.cursorpadding - self.cursory = self.cursorpadding + self.worldx = -self.cursorpadding + self.worldy = -self.cursorpadding + self.cursorx = 0 + self.cursory = 0 self.lastcurx = self.cursorx self.lastcury = self.cursory @@ -137,6 +137,7 @@ class Map(): if chunk: chunk.set(inchunkx(self.cursorx), inchunky(self.cursory), char) + pool.save_changes_delayed() self.move_cursor(1, 0, False) @@ -145,8 +146,9 @@ class Map(): chunk = pool.get(Position(chunkx(self.cursorx-1), chunky(self.cursory))) if chunk: chunk.delete(inchunkx(self.cursorx-1), inchunky(self.cursory)) - - self.move_cursor(-1, 0, False) + pool.save_changes_delayed() + + self.move_cursor(-1, 0, False) def newline(self): self.set_cursor(self.lastcurx, self.lastcury+1) @@ -207,6 +209,9 @@ class Map(): #) #self.load_visible() + + def apply_changes(self, changes): + self.chunkpool.apply_changes(changes) ChunkStyle = namedtuple("ChunkStyle", "string color") diff --git a/server.py b/server.py index 2c24cf4..41c6fb2 100644 --- a/server.py +++ b/server.py @@ -1 +1,94 @@ # import from chunks, dbchunkpool +import json +from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket + +from utils import Position +from chunks import ChunkDiff +from dbchunkpool import DBChunkPool + +pool = DBChunkPool() +clients = set() + +class WotServer(WebSocket): + def handle_request_chunks(self, coords): + changes = [] + with pool: + for coor in coords: + pos = Position(coor[0], coor[1]) + change = pool.get(pos) or pool.create(pos) + dchange = change.as_diff().to_dict() + changes.append((pos, dchange)) + + self.loaded_chunks.add(pos) + + message = {"type": "apply-changes", "data": changes} + print(f"Message bong sent: {json.dumps(message)}") + self.sendMessage(json.dumps(message)) + + def handle_unload_chunks(self, coords): + for coor in coords: + pos = Position(coor) + self.loaded_chunks.remove(pos) + + def handle_save_changes(self, dchanges): + changes = [] + for chunk in dchanges: + print("CHUNK!", chunk) + pos = Position(chunk[0][0], chunk[0][1]) + change = ChunkDiff.from_dict(chunk[1]) + changes.append((pos, change)) + + with pool: + pool.apply_changes(changes) + + #with pool: + #for chunk in changes: + #print("changed content:", pool.get(chunk[0])._content) + + for client in clients: + client.send_changes(changes) + + def send_changes(self, changes): + print("NORMAL CHANGES:", changes) + dchanges = [] + for chunk in changes: + pos = chunk[0] + change = chunk[1] + if pos in self.loaded_chunks: + dchanges.append((pos, change.to_dict())) + print("LOADED CHANGES:", dchanges) + + if dchanges: + print("Changes!") + message = {"type": "apply-changes", "data": dchanges} + print("Changes?") + print(f"Message bang sent: {json.dumps(message)}") + self.sendMessage(json.dumps(message)) + + def handleMessage(self): + message = json.loads(self.data) + print(f"message arrived: {message}") + if message["type"] == "request-chunks": + self.handle_request_chunks(message["data"]) + elif message["type"] == "unload-chunks": + self.handle_unload_chunks(message["data"]) + elif message["type"] == "save-changes": + self.handle_save_changes(message["data"]) + + print("Message received and dealt with.") + #changes = [] + #for chunk in message["data"]: + #pass + #self.sendMessage(self.data) + + def handleConnected(self): + print(self.address, 'connected') + clients.add(self) + self.loaded_chunks = set() + + def handleClose(self): + print(self.address, 'closed') + clients.remove(self) + +server = SimpleWebSocketServer('', 8000, WotServer) +server.serveforever()