Simplify how Connection works

This commit is contained in:
Joscha 2018-02-12 12:26:02 +00:00
parent 6aafaac2d9
commit a4e49a57ba

View file

@ -80,7 +80,6 @@ type Reply = Either EuphException
data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r)) data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r))
type SendQueue = TBQueue Send type SendQueue = TBQueue Send
type RecvQueue = TBQueue Recv
type EventQueue = TBQueue EventType type EventQueue = TBQueue EventType
type LockedFlag = TVar Bool type LockedFlag = TVar Bool
@ -103,7 +102,6 @@ getEvent (Connection _ _ qEvent) = atomically $ readTBQueue qEvent
startEuphConnection :: String -> String -> IO Connection startEuphConnection :: String -> String -> IO Connection
startEuphConnection host room = do startEuphConnection host room = do
sendQueue <- atomically $ newTBQueue 10 sendQueue <- atomically $ newTBQueue 10
recvQueue <- atomically $ newTBQueue 10
eventQueue <- atomically $ newTBQueue 10 eventQueue <- atomically $ newTBQueue 10
locked <- atomically $ newTVar False locked <- atomically $ newTVar False
let euphCon = Connection locked sendQueue eventQueue let euphCon = Connection locked sendQueue eventQueue
@ -111,25 +109,12 @@ startEuphConnection host room = do
$ forkIO $ forkIO
$ handle (handleException eventQueue) $ handle (handleException eventQueue)
$ WSS.runSecureClient host 443 ("/room/" ++ room ++ "/ws") $ WSS.runSecureClient host 443 ("/room/" ++ room ++ "/ws")
$ recvClient euphCon recvQueue $ recvClient euphCon
return euphCon return euphCon
where where
handleException :: EventQueue -> WS.HandshakeException -> IO () handleException :: EventQueue -> WS.HandshakeException -> IO ()
handleException qEvent _ = atomically $ writeTBQueue qEvent ConnectionFailed handleException qEvent _ = atomically $ writeTBQueue qEvent ConnectionFailed
{-
- Fetch thread
-}
fetchThread :: RecvQueue -> WS.Connection -> IO ()
fetchThread qRecv con = handle handleException $ forever $ do
message <- WS.receiveData con
atomically $ writeTBQueue qRecv (RPacket message)
where
handleException (WS.CloseRequest _ _) = atomically $ writeTBQueue qRecv RDisconnected
handleException WS.ConnectionClosed = atomically $ writeTBQueue qRecv RDisconnected
handleException _ = fetchThread qRecv con
{- {-
- Send thread - Send thread
-} -}
@ -155,34 +140,34 @@ readWhileOpen (Connection locked qSend _) = do
then return Nothing then return Nothing
else Just <$> readTBQueue qSend else Just <$> readTBQueue qSend
sendThread :: Connection -> RecvQueue -> WS.Connection -> StateT Integer IO () sendThread :: RecvInfo -> StateT Integer IO ()
sendThread euphCon qRecv con = do sendThread info@RecvInfo{..} = do
item <- liftIO $ atomically $ readWhileOpen euphCon item <- liftIO $ atomically $ readWhileOpen recvEuphCon
case item of case item of
Nothing -> Nothing ->
return () return ()
Just SDisconnect -> Just SDisconnect ->
liftIO $ WS.sendClose con ("Bye." :: T.Text) liftIO $ WS.sendClose recvCon ("Bye. -EuphApi" :: T.Text)
Just (SNoReply packetType packetData) -> do Just (SNoReply packetType packetData) -> do
(packet, _) <- preparePacket packetType packetData (packet, _) <- preparePacket packetType packetData
liftIO $ WS.sendTextData con packet
continue <- liftIO $ sendSafely packet continue <- liftIO $ sendSafely packet
when continue $ when continue $
sendThread euphCon qRecv con sendThread info
Just (SReply packetType packetData reply) -> do Just (SReply packetType packetData reply) -> do
(packet, packetID) <- preparePacket packetType packetData (packet, packetID) <- preparePacket packetType packetData
liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply) liftIO $ atomically $ modifyTVar recvWaiting (M.insert packetID reply)
continue <- liftIO $ sendSafely packet continue <- liftIO $ sendSafely packet
when continue $ when continue $
sendThread euphCon qRecv con sendThread info
where where
sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException sendSafely packet = (WS.sendTextData recvCon packet >> return True) `catch` handleException
handleException (WS.CloseRequest _ _) = return False handleException (WS.CloseRequest _ _) = return False
handleException WS.ConnectionClosed = return False handleException WS.ConnectionClosed = return False
handleException _ = return True handleException _ = return True
-- TODO: Think about whether this is safe (memory leak issues etc.)
{- {-
- Receive thread - Receive thread
@ -202,49 +187,61 @@ instance FromJSON PacketInfo where
Just d -> return $ Right d Just d -> return $ Right d
return PacketInfo{..} return PacketInfo{..}
data RecvInfo = RecvInfo
{ recvEuphCon :: Connection
, recvCon :: WS.Connection
, recvWaiting :: TVar Awaiting
}
-- TODO: Swap for HashMap? -- TODO: Swap for HashMap?
type Awaiting = M.Map T.Text ReplyMVar type Awaiting = M.Map T.Text ReplyMVar
whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m () whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m ()
whenJust m f = maybe (return ()) f m whenJust m f = maybe (return ()) f m
processPacket :: EventQueue -> BS.ByteString -> StateT Awaiting IO () processEvent :: BS.ByteString -> RecvInfo -> IO ()
processPacket qEvent bs = do processEvent bs RecvInfo{..} = do
-- First, deal with event channel events. let (Connection _ _ qEvent) = recvEuphCon
case decode bs of case decode bs of
Nothing -> return () Nothing -> return ()
Just event -> liftIO $ atomically $ writeTBQueue qEvent (EuphEvent event) Just event -> atomically $ writeTBQueue qEvent (EuphEvent event)
-- Then, deal with replies.
-- Find out whether there is actually any dealing with replies to do... processReply :: BS.ByteString -> RecvInfo -> IO ()
replies <- get processReply bs RecvInfo{..} = do
let result = do -- Maybe monad -- Figure out whether this packet is actually a reply of some sort.
let maybeInfo = do
PacketInfo{..} <- decode bs PacketInfo{..} <- decode bs
replyID <- infoPacketID replyID <- infoPacketID
replyMVar <- M.lookup replyID replies return (replyID, infoData)
return (replyID, replyMVar, infoData) whenJust maybeInfo $ \(replyID, infoData) -> do
-- ... and then write the appropriate result into the MVar. -- Figure out whether we're waiting for that ID and find the correct MVar
whenJust result $ \(replyID, ReplyMVar var, infoData) -> do -- and remove it from the TVar if we do find it.
modify (M.delete replyID) maybeReplyMVar <- atomically $ do
case infoData of waiting <- readTVar recvWaiting
Left e -> liftIO $ putMVar var (Left (EuphServerError e)) case M.lookup replyID waiting of
Right d -> Nothing -> return Nothing
case parseEither parseJSON d of Just var -> do
Left e -> liftIO $ putMVar var (Left $ EuphParse e bs) modifyTVar recvWaiting (M.delete replyID)
Right r -> liftIO $ putMVar var (Right r) return (Just var)
whenJust maybeReplyMVar $ \(ReplyMVar var) ->
-- We now know that the packet is a reply, and we know the MVar to send
-- it to. Now we only need to send the correct reply through the MVar.
case infoData of
Left e -> putMVar var (Left $ EuphServerError e)
Right d ->
case parseEither parseJSON d of
Left e -> putMVar var (Left $ EuphParse e bs)
Right r -> putMVar var (Right r)
processRecv :: RecvQueue -> EventQueue -> StateT Awaiting IO () processRecv :: RecvInfo -> IO ()
processRecv qRecv qEvent = do processRecv info@RecvInfo{..} = handle handleException $ forever $ do
recv <- liftIO $ atomically $ readTBQueue qRecv message <- WS.receiveData recvCon -- retrieve packet from ws connection
case recv of processEvent message info
RReply packetID replyMVar -> do processReply message info
modify (M.insert packetID replyMVar) where
processRecv qRecv qEvent handleException (WS.CloseRequest _ _) = return ()
handleException WS.ConnectionClosed = return ()
RPacket bs -> do handleException _ = processRecv info -- continue running
processPacket qEvent bs
processRecv qRecv qEvent
RDisconnected -> return ()
cleanupWaiting :: Awaiting -> IO () cleanupWaiting :: Awaiting -> IO ()
cleanupWaiting replies = cleanupWaiting replies =
@ -267,24 +264,17 @@ cleanupSend qSend = do
SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected)
_ -> return () _ -> return ()
cleanupRecv :: RecvQueue -> IO () recvClient :: Connection -> WS.ClientApp ()
cleanupRecv qRecv = do recvClient euphCon@(Connection locked qSend qEvent) con = do
recvs <- atomically $ emptyTBQueue qRecv waiting <- atomically $ newTVar M.empty
forM_ recvs $ \case let info = RecvInfo{recvEuphCon=euphCon, recvCon=con, recvWaiting=waiting}
RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) tSend <- async $ evalStateT (sendThread info) 0
_ -> return () processRecv info
-- Stop and clean up stuff
recvClient :: Connection -> RecvQueue -> WS.ClientApp ()
recvClient euphCon@(Connection locked qSend qEvent) qRecv con = do
tFetch <- async $ fetchThread qRecv con
tSend <- async $ evalStateT (sendThread euphCon qRecv con) 0
waitingReplies <- execStateT (processRecv qRecv qEvent) M.empty
atomically $ writeTVar locked True atomically $ writeTVar locked True
wait tFetch
wait tSend wait tSend
cleanupWaiting waitingReplies
cleanupSend qSend cleanupSend qSend
cleanupRecv qRecv atomically (readTVar waiting) >>= cleanupWaiting
atomically $ writeTBQueue qEvent Disconnected atomically $ writeTBQueue qEvent Disconnected
{- {-
@ -373,7 +363,10 @@ getMessage euphCon getMessageCommandID = do
-- The command returns a list of 'E.Message's from the rooms message log. -- The command returns a list of 'E.Message's from the rooms message log.
-- --
-- > (log, Maybe before) <- messageLog con amount (Maybe before) -- > (log, Maybe before) <- messageLog con amount (Maybe before)
messageLog :: Connection -> Integer -> Maybe E.Snowflake -> IO ([E.Message], Maybe E.Snowflake) messageLog :: Connection
-> Integer
-> Maybe E.Snowflake
-> IO ([E.Message], Maybe E.Snowflake)
messageLog euphCon logCommandN logCommandBefore = do messageLog euphCon logCommandN logCommandBefore = do
LogReply{..} <- sendPacket euphCon "log" LogCommand{..} LogReply{..} <- sendPacket euphCon "log" LogCommand{..}
return (logReplyLog, logReplyBefore) return (logReplyLog, logReplyBefore)
@ -451,10 +444,6 @@ instance Show EuphException where
instance Exception EuphException instance Exception EuphException
data Recv = RDisconnected
| RPacket BS.ByteString
| RReply PacketID ReplyMVar
data Send = SDisconnect data Send = SDisconnect
| forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents
| forall p . (ToJSON p) => SReply T.Text p ReplyMVar | forall p . (ToJSON p) => SReply T.Text p ReplyMVar