From 26d08b731283a2aa8914fd56f1b0a63775fd3436 Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 5 Feb 2018 15:43:48 +0000 Subject: [PATCH] Implement EuphConnection --- src/EuphApi/CloseableChan.hs | 1 + src/EuphApi/Threads.hs | 156 ++++++++++++++++++++++------------- 2 files changed, 101 insertions(+), 56 deletions(-) diff --git a/src/EuphApi/CloseableChan.hs b/src/EuphApi/CloseableChan.hs index 05cfccd..cc3ab4c 100644 --- a/src/EuphApi/CloseableChan.hs +++ b/src/EuphApi/CloseableChan.hs @@ -39,6 +39,7 @@ data CloseableChan a = CloseableChan , cChan :: TChan (Content a) } +-- TODO: Replace with Maybe? data Content a = Value a | End diff --git a/src/EuphApi/Threads.hs b/src/EuphApi/Threads.hs index 5fadd2d..9339833 100644 --- a/src/EuphApi/Threads.hs +++ b/src/EuphApi/Threads.hs @@ -1,4 +1,5 @@ {-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} @@ -24,13 +25,14 @@ -- f: RDisconnected -> RecvChan -- f: *stops* -- r: RecvChan -> RDisconnected +-- r: EDisconnected -> EventChan -- r: close SendChan -- s: *stops* -- r: wait for f and s to stop -- r: clean up SendChan -- r: clean up RecvChan -- r: clean up response list --- r: EventStopped -> EventChan +-- r: EStopped -> EventChan -- r: *stops* -- -> All MVars are dealt with -- @@ -58,15 +60,17 @@ -- @ module EuphApi.Threads ( - -- * Events and replies - EuphException(..) - , Event(..) - , EuphEvent(..) + -- * Connecting to euphoria + EuphConnection + , euphClient + , getEvents -- * API functions , pingReply , nick - -- * Connection to euphoria - , euphClient + , send + -- * Events and replies + , EuphException(..) + , EuphEvent(..) ) where import Control.Applicative @@ -74,6 +78,7 @@ import Control.Concurrent import Control.Exception import Control.Monad import Control.Monad.IO.Class +import Data.Maybe import Control.Concurrent.Async import Control.Monad.Trans.State @@ -95,6 +100,10 @@ type Reply = Either EuphException data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r)) -- | The ways in which getting a reply from the server can fail. +-- +-- An EuphException may be raised by any function in the API functions section. +-- +-- TODO: link to section if possible data EuphException = EuphClosed -- ^ Could not send message because connection was closed. | EuphDisconnected @@ -112,8 +121,8 @@ instance Show EuphException where instance Exception EuphException -sendPacket :: (ToJSON p, FromJSON r) => SendChan -> T.Text -> p -> IO r -sendPacket chan packetType packetData = do +sendPacket :: (ToJSON p, FromJSON r) => EuphConnection -> T.Text -> p -> IO r +sendPacket (EuphConnection chan _) packetType packetData = do var <- newEmptyMVar let packet = SReply packetType packetData (ReplyMVar var) done <- E.writeChan chan packet @@ -125,8 +134,8 @@ sendPacket chan packetType packetData = do Left f -> throw f Right r -> return r -sendPacketNoReply :: (ToJSON p) => SendChan -> T.Text -> p -> IO () -sendPacketNoReply chan packetType packetData = do +sendPacketNoReply :: (ToJSON p) => EuphConnection -> T.Text -> p -> IO () +sendPacketNoReply (EuphConnection chan _) packetType packetData = do let packet = SNoReply packetType packetData done <- E.writeChan chan packet case done of @@ -137,17 +146,23 @@ sendPacketNoReply chan packetType packetData = do - API functions -} -pingReply :: SendChan -> UTCTime -> IO () -pingReply chan time = do - let cmd = PingReplyCommand time - sendPacketNoReply chan "ping-reply" cmd +-- TODO: Add proper documentation +pingReply :: EuphConnection -> UTCTime -> IO () +pingReply econ pingReplyCommandTime = + sendPacketNoReply econ "ping-reply" PingReplyCommand{..} -nick :: SendChan -> T.Text -> IO (E.Nick, E.Nick) -nick chan name = do - let cmd = NickCommand name - NickReply{..} <- sendPacket chan "nick" cmd +-- TODO: Add proper documentation +nick :: EuphConnection -> T.Text -> IO (E.Nick, E.Nick) +nick econ nickCommandName = do + NickReply{..} <- sendPacket econ "nick" NickCommand{..} return (nickReplyFrom, nickReplyTo) +-- TODO: Add proper documentation +send :: EuphConnection -> Maybe E.Snowflake -> T.Text -> IO E.Message +send econ sendCommandParent sendCommandContent = do + SendReply{..} <- sendPacket econ "send" SendCommand{..} + return sendReplyMessage + {- - Commands and replies -} @@ -195,7 +210,7 @@ instance FromJSON NickReply where data SendCommand = SendCommand { sendCommandContent :: T.Text - , sendCommandParent :: Maybe PacketID + , sendCommandParent :: Maybe E.Snowflake } deriving (Show) instance ToJSON SendCommand where @@ -328,7 +343,7 @@ instance FromJSON EuphEvent where - Channels -} -type RecvChan = Chan Recv +type RecvChan = E.CloseableChan Recv data Recv = RDisconnected | RPacket BS.ByteString | RReply PacketID ReplyMVar @@ -338,11 +353,8 @@ data Send = SDisconnect | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents | forall p . (ToJSON p) => SReply T.Text p ReplyMVar -type EventChan e = Chan (Event e) -data Event e = EDisconnected - | EStopped - | EEuphEvent EuphEvent - | ECustomEvent e +type EventChan = Chan Event +type Event = Maybe EuphEvent {- - Fetch thread @@ -351,18 +363,16 @@ data Event e = EDisconnected fetchThread :: RecvChan -> WS.Connection -> IO () fetchThread cRecv con = handle handleException $ forever $ do message <- WS.receiveData con - void $ writeChan cRecv (RPacket message) -- will never be closed while thread running + void $ E.writeChan cRecv (RPacket message) -- will never be closed while thread running where - handleException (WS.CloseRequest _ _) = void $ writeChan cRecv RDisconnected - handleException WS.ConnectionClosed = void $ writeChan cRecv RDisconnected + handleException (WS.CloseRequest _ _) = void $ E.writeChan cRecv RDisconnected + handleException WS.ConnectionClosed = void $ E.writeChan cRecv RDisconnected handleException _ = fetchThread cRecv con {- - Send thread -} -type SendState = StateT Integer IO - -- Prepare a single packet for sending preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID) preparePacket packetType packetData = do @@ -395,7 +405,7 @@ sendThread cSend cRecv con = do Just (SReply packetType packetData reply) -> do (packet, packetID) <- preparePacket packetType packetData - liftIO $ writeChan cRecv $ RReply packetID reply + void $ liftIO $ E.writeChan cRecv (RReply packetID reply) continue <- liftIO $ sendSafely packet when continue $ sendThread cSend cRecv con @@ -420,57 +430,91 @@ instance FromJSON PacketInfo where infoServerError <- o .:? "error" return PacketInfo{..} --- Possibly unnecessary -- TODO: Swap for HashMap? -newtype Awaiting = Awaiting (M.Map T.Text ReplyMVar) +type Awaiting = M.Map T.Text ReplyMVar -emptyReplies :: Awaiting -emptyReplies = Awaiting M.empty - -processRecv :: RecvChan -> EventChan e -> Awaiting -> IO Awaiting -processRecv cRecv cEvent a@(Awaiting replies) = do - recv <- readChan cRecv +processRecv :: RecvChan -> EventChan -> Awaiting -> IO Awaiting +processRecv cRecv cEvent replies = do + recv <- E.readChan cRecv case recv of - RDisconnected -> - return a - - RReply packetID replyMVar -> do + Just (RReply packetID replyMVar) -> do let newReplies = M.insert packetID replyMVar replies - processRecv cRecv cEvent (Awaiting newReplies) + processRecv cRecv cEvent newReplies - RPacket bs -> - undefined -- TODO + Just (RPacket bs) -> do + newReplies <- processPacket cEvent bs replies + processRecv cRecv cEvent newReplies + + _ -> return replies + +processPacket :: EventChan -> BS.ByteString -> Awaiting -> IO Awaiting +processPacket cEvent bs replies = do + -- First, deal with event channel events. + case A.decode bs of + Nothing -> return () + Just event -> writeChan cEvent (Just event) + -- Then, deal with replies. + fromMaybe (return replies) $ do + PacketInfo{..} <- A.decode bs + replyID <- infoPacketID + (ReplyMVar var) <- M.lookup replyID replies + let newReplies = M.delete replyID replies + case infoServerError of + Nothing -> do + reply <- A.decode bs + return $ newReplies <$ putMVar var (Right reply) + Just e -> + return $ newReplies <$ putMVar var (Left (EuphServerError e)) cleanupWaiting :: Awaiting -> IO () -cleanupWaiting (Awaiting replies) = +cleanupWaiting replies = forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected) cleanupSend :: SendChan -> IO () -cleanupSend cSend = undefined +cleanupSend cSend = do + sends <- E.emptyChan cSend + forM_ sends $ \case + SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) + _ -> return () cleanupRecv :: RecvChan -> IO () -cleanupRecv cRecv = undefined +cleanupRecv cRecv = do + recvs <- E.emptyChan cRecv + forM_ recvs $ \case + RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) + _ -> return () -recvThread :: SendChan -> RecvChan -> EventChan e -> WS.Connection -> IO () +recvThread :: SendChan -> RecvChan -> EventChan -> WS.Connection -> IO () recvThread cSend cRecv cEvent con = do tFetch <- async $ fetchThread cRecv con tSend <- async $ evalStateT (sendThread cSend cRecv con) 0 - waitingReplies <- processRecv cRecv cEvent emptyReplies + waitingReplies <- processRecv cRecv cEvent M.empty E.closeChan cSend wait tFetch wait tSend cleanupWaiting waitingReplies cleanupSend cSend cleanupRecv cRecv + writeChan cEvent Nothing {- - Startup -} -euphClient :: WS.ClientApp (SendChan, EventChan e) +-- TODO: Add proper documentation +data EuphConnection = EuphConnection SendChan EventChan + +-- TODO: Add proper documentation +getEvents :: EuphConnection -> IO [EuphEvent] +getEvents (EuphConnection _ cEvent) = do + events <- getChanContents cEvent + return $ catMaybes $ takeWhile isJust events + +-- TODO: Add proper documentation +euphClient :: WS.ClientApp EuphConnection euphClient con = do sendChan <- E.newOpenChan - recvChan <- newChan + recvChan <- E.newOpenChan eventChan <- newChan - forkIO $ recvThread sendChan recvChan eventChan con - return (sendChan, eventChan) + void $ forkIO $ recvThread sendChan recvChan eventChan con + return $ EuphConnection sendChan eventChan