From a4e49a57ba3137db9dcc59f958857a267683c10e Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 12 Feb 2018 12:26:02 +0000 Subject: [PATCH] Simplify how Connection works --- src/EuphApi/Connection.hs | 145 ++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 78 deletions(-) diff --git a/src/EuphApi/Connection.hs b/src/EuphApi/Connection.hs index 47c79cf..a99d070 100644 --- a/src/EuphApi/Connection.hs +++ b/src/EuphApi/Connection.hs @@ -80,7 +80,6 @@ type Reply = Either EuphException data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r)) type SendQueue = TBQueue Send -type RecvQueue = TBQueue Recv type EventQueue = TBQueue EventType type LockedFlag = TVar Bool @@ -103,7 +102,6 @@ getEvent (Connection _ _ qEvent) = atomically $ readTBQueue qEvent startEuphConnection :: String -> String -> IO Connection startEuphConnection host room = do sendQueue <- atomically $ newTBQueue 10 - recvQueue <- atomically $ newTBQueue 10 eventQueue <- atomically $ newTBQueue 10 locked <- atomically $ newTVar False let euphCon = Connection locked sendQueue eventQueue @@ -111,25 +109,12 @@ startEuphConnection host room = do $ forkIO $ handle (handleException eventQueue) $ WSS.runSecureClient host 443 ("/room/" ++ room ++ "/ws") - $ recvClient euphCon recvQueue + $ recvClient euphCon return euphCon where handleException :: EventQueue -> WS.HandshakeException -> IO () handleException qEvent _ = atomically $ writeTBQueue qEvent ConnectionFailed -{- - - Fetch thread - -} - -fetchThread :: RecvQueue -> WS.Connection -> IO () -fetchThread qRecv con = handle handleException $ forever $ do - message <- WS.receiveData con - atomically $ writeTBQueue qRecv (RPacket message) - where - handleException (WS.CloseRequest _ _) = atomically $ writeTBQueue qRecv RDisconnected - handleException WS.ConnectionClosed = atomically $ writeTBQueue qRecv RDisconnected - handleException _ = fetchThread qRecv con - {- - Send thread -} @@ -155,34 +140,34 @@ readWhileOpen (Connection locked qSend _) = do then return Nothing else Just <$> readTBQueue qSend -sendThread :: Connection -> RecvQueue -> WS.Connection -> StateT Integer IO () -sendThread euphCon qRecv con = do - item <- liftIO $ atomically $ readWhileOpen euphCon +sendThread :: RecvInfo -> StateT Integer IO () +sendThread info@RecvInfo{..} = do + item <- liftIO $ atomically $ readWhileOpen recvEuphCon case item of Nothing -> return () Just SDisconnect -> - liftIO $ WS.sendClose con ("Bye." :: T.Text) + liftIO $ WS.sendClose recvCon ("Bye. -EuphApi" :: T.Text) Just (SNoReply packetType packetData) -> do (packet, _) <- preparePacket packetType packetData - liftIO $ WS.sendTextData con packet continue <- liftIO $ sendSafely packet when continue $ - sendThread euphCon qRecv con + sendThread info Just (SReply packetType packetData reply) -> do (packet, packetID) <- preparePacket packetType packetData - liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply) + liftIO $ atomically $ modifyTVar recvWaiting (M.insert packetID reply) continue <- liftIO $ sendSafely packet when continue $ - sendThread euphCon qRecv con + sendThread info where - sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException + sendSafely packet = (WS.sendTextData recvCon packet >> return True) `catch` handleException handleException (WS.CloseRequest _ _) = return False handleException WS.ConnectionClosed = return False handleException _ = return True + -- TODO: Think about whether this is safe (memory leak issues etc.) {- - Receive thread @@ -202,49 +187,61 @@ instance FromJSON PacketInfo where Just d -> return $ Right d return PacketInfo{..} +data RecvInfo = RecvInfo + { recvEuphCon :: Connection + , recvCon :: WS.Connection + , recvWaiting :: TVar Awaiting + } + -- TODO: Swap for HashMap? type Awaiting = M.Map T.Text ReplyMVar whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m () whenJust m f = maybe (return ()) f m -processPacket :: EventQueue -> BS.ByteString -> StateT Awaiting IO () -processPacket qEvent bs = do - -- First, deal with event channel events. +processEvent :: BS.ByteString -> RecvInfo -> IO () +processEvent bs RecvInfo{..} = do + let (Connection _ _ qEvent) = recvEuphCon case decode bs of Nothing -> return () - Just event -> liftIO $ atomically $ writeTBQueue qEvent (EuphEvent event) - -- Then, deal with replies. - -- Find out whether there is actually any dealing with replies to do... - replies <- get - let result = do -- Maybe monad + Just event -> atomically $ writeTBQueue qEvent (EuphEvent event) + +processReply :: BS.ByteString -> RecvInfo -> IO () +processReply bs RecvInfo{..} = do + -- Figure out whether this packet is actually a reply of some sort. + let maybeInfo = do PacketInfo{..} <- decode bs - replyID <- infoPacketID - replyMVar <- M.lookup replyID replies - return (replyID, replyMVar, infoData) - -- ... and then write the appropriate result into the MVar. - whenJust result $ \(replyID, ReplyMVar var, infoData) -> do - modify (M.delete replyID) - case infoData of - Left e -> liftIO $ putMVar var (Left (EuphServerError e)) - Right d -> - case parseEither parseJSON d of - Left e -> liftIO $ putMVar var (Left $ EuphParse e bs) - Right r -> liftIO $ putMVar var (Right r) + replyID <- infoPacketID + return (replyID, infoData) + whenJust maybeInfo $ \(replyID, infoData) -> do + -- Figure out whether we're waiting for that ID and find the correct MVar + -- and remove it from the TVar if we do find it. + maybeReplyMVar <- atomically $ do + waiting <- readTVar recvWaiting + case M.lookup replyID waiting of + Nothing -> return Nothing + Just var -> do + modifyTVar recvWaiting (M.delete replyID) + return (Just var) + whenJust maybeReplyMVar $ \(ReplyMVar var) -> + -- We now know that the packet is a reply, and we know the MVar to send + -- it to. Now we only need to send the correct reply through the MVar. + case infoData of + Left e -> putMVar var (Left $ EuphServerError e) + Right d -> + case parseEither parseJSON d of + Left e -> putMVar var (Left $ EuphParse e bs) + Right r -> putMVar var (Right r) -processRecv :: RecvQueue -> EventQueue -> StateT Awaiting IO () -processRecv qRecv qEvent = do - recv <- liftIO $ atomically $ readTBQueue qRecv - case recv of - RReply packetID replyMVar -> do - modify (M.insert packetID replyMVar) - processRecv qRecv qEvent - - RPacket bs -> do - processPacket qEvent bs - processRecv qRecv qEvent - - RDisconnected -> return () +processRecv :: RecvInfo -> IO () +processRecv info@RecvInfo{..} = handle handleException $ forever $ do + message <- WS.receiveData recvCon -- retrieve packet from ws connection + processEvent message info + processReply message info + where + handleException (WS.CloseRequest _ _) = return () + handleException WS.ConnectionClosed = return () + handleException _ = processRecv info -- continue running cleanupWaiting :: Awaiting -> IO () cleanupWaiting replies = @@ -267,24 +264,17 @@ cleanupSend qSend = do SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) _ -> return () -cleanupRecv :: RecvQueue -> IO () -cleanupRecv qRecv = do - recvs <- atomically $ emptyTBQueue qRecv - forM_ recvs $ \case - RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) - _ -> return () - -recvClient :: Connection -> RecvQueue -> WS.ClientApp () -recvClient euphCon@(Connection locked qSend qEvent) qRecv con = do - tFetch <- async $ fetchThread qRecv con - tSend <- async $ evalStateT (sendThread euphCon qRecv con) 0 - waitingReplies <- execStateT (processRecv qRecv qEvent) M.empty +recvClient :: Connection -> WS.ClientApp () +recvClient euphCon@(Connection locked qSend qEvent) con = do + waiting <- atomically $ newTVar M.empty + let info = RecvInfo{recvEuphCon=euphCon, recvCon=con, recvWaiting=waiting} + tSend <- async $ evalStateT (sendThread info) 0 + processRecv info + -- Stop and clean up stuff atomically $ writeTVar locked True - wait tFetch wait tSend - cleanupWaiting waitingReplies cleanupSend qSend - cleanupRecv qRecv + atomically (readTVar waiting) >>= cleanupWaiting atomically $ writeTBQueue qEvent Disconnected {- @@ -373,7 +363,10 @@ getMessage euphCon getMessageCommandID = do -- The command returns a list of 'E.Message's from the room’s message log. -- -- > (log, Maybe before) <- messageLog con amount (Maybe before) -messageLog :: Connection -> Integer -> Maybe E.Snowflake -> IO ([E.Message], Maybe E.Snowflake) +messageLog :: Connection + -> Integer + -> Maybe E.Snowflake + -> IO ([E.Message], Maybe E.Snowflake) messageLog euphCon logCommandN logCommandBefore = do LogReply{..} <- sendPacket euphCon "log" LogCommand{..} return (logReplyLog, logReplyBefore) @@ -451,10 +444,6 @@ instance Show EuphException where instance Exception EuphException -data Recv = RDisconnected - | RPacket BS.ByteString - | RReply PacketID ReplyMVar - data Send = SDisconnect | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents | forall p . (ToJSON p) => SReply T.Text p ReplyMVar