From 3203ecb59133a102724dfe053432d9751884f127 Mon Sep 17 00:00:00 2001 From: Joscha Date: Tue, 6 Feb 2018 15:30:17 +0000 Subject: [PATCH] Clean up Threads.hs --- src/EuphApi/Threads.hs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/EuphApi/Threads.hs b/src/EuphApi/Threads.hs index 65614f4..54cb679 100644 --- a/src/EuphApi/Threads.hs +++ b/src/EuphApi/Threads.hs @@ -29,7 +29,7 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Trans.State -import Data.Aeson as A +import Data.Aeson import qualified Data.ByteString.Lazy as BS import qualified Data.HashMap.Strict as HM import qualified Data.Map as M @@ -60,7 +60,7 @@ data Connection = Connection LockedFlag SendQueue EventQueue -- After that, any further calls of @getEvent@ on the same @Connection@ -- will block indefinitely. getEvent :: Connection -> IO (Maybe Event) -getEvent (Connection locked _ qEvent) = undefined locked qEvent +getEvent (Connection _ _ qEvent) = atomically $ readTBQueue qEvent -- | A 'WS.ClientApp' creating a 'Connection'. euphApp :: WS.ClientApp Connection @@ -78,9 +78,9 @@ euphApp con = do -} fetchThread :: RecvQueue -> WS.Connection -> IO () -fetchThread qRecv con = void $ handle handleException $ forever $ do +fetchThread qRecv con = handle handleException $ forever $ do message <- WS.receiveData con - void $ atomically $ writeTBQueue qRecv (RPacket message) + atomically $ writeTBQueue qRecv (RPacket message) where handleException (WS.CloseRequest _ _) = atomically $ writeTBQueue qRecv RDisconnected handleException WS.ConnectionClosed = atomically $ writeTBQueue qRecv RDisconnected @@ -95,11 +95,11 @@ fetchThread qRecv con = void $ handle handleException $ forever $ do preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID) preparePacket packetType packetData = do packetNr <- get - put $ packetNr + 1 + modify (+1) let packetID = T.pack $ show packetNr bytestr = encode . Object . HM.fromList $ - [ ("id", A.String packetID) - , ("type", A.String packetType) + [ ("id", String packetID) + , ("type", String packetType) , ("data", toJSON packetData) ] return (bytestr, packetID) @@ -130,7 +130,7 @@ sendThread euphCon qRecv con = do Just (SReply packetType packetData reply) -> do (packet, packetID) <- preparePacket packetType packetData - void $ liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply) + liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply) continue <- liftIO $ sendSafely packet when continue $ sendThread euphCon qRecv con @@ -164,14 +164,14 @@ whenJust m f = maybe (return ()) f m processPacket :: EventQueue -> BS.ByteString -> StateT Awaiting IO () processPacket qEvent bs = do -- First, deal with event channel events. - case A.decode bs of + case decode bs of Nothing -> return () Just event -> liftIO $ atomically $ writeTBQueue qEvent (Just event) -- Then, deal with replies. -- Find out whether there is actually any dealing with replies to do... replies <- get let result = do -- Maybe monad - PacketInfo{..} <- A.decode bs + PacketInfo{..} <- decode bs replyID <- infoPacketID replyMVar <- M.lookup replyID replies return (replyID, replyMVar, infoServerError) @@ -181,7 +181,7 @@ processPacket qEvent bs = do case serverError of Just e -> liftIO $ putMVar var (Left (EuphServerError e)) Nothing -> - case A.decode bs of + case decode bs of Nothing -> liftIO $ putMVar var (Left EuphParse) Just r -> liftIO $ putMVar var (Right r) @@ -384,6 +384,7 @@ data Event -- It also offers a snapshot of the room’s state and recent history. -- -- @SnapshotEvent version listing log (Maybe nick)@ + deriving (Show) -- LoginEvent -- not implemented -- LogoutEvent -- not implemented