From 102010f442230b492a730a88f668a52d5379dbe1 Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 5 Feb 2018 19:31:00 +0000 Subject: [PATCH] Rewrite Connection to not depend on CloseableChan --- src/EuphApi/CloseableChan.hs | 150 ------- src/EuphApi/Threads.hs | 755 +++++++++++++++++------------------ 2 files changed, 357 insertions(+), 548 deletions(-) delete mode 100644 src/EuphApi/CloseableChan.hs diff --git a/src/EuphApi/CloseableChan.hs b/src/EuphApi/CloseableChan.hs deleted file mode 100644 index cc3ab4c..0000000 --- a/src/EuphApi/CloseableChan.hs +++ /dev/null @@ -1,150 +0,0 @@ -{-# LANGUAGE RecordWildCards #-} - --- | Chans that can be closed and reopened. --- --- While a 'CloseableChan' is closed, it can not be written to or read from. --- Calls to 'writeChan' and 'readChan' are non-blocking while a chan is closed. --- --- If a thread is attempting to read from a chan using 'readChan' and that chan is closed, --- the call to 'readChan' resumes and @Nothing@ is returned. - -module EuphApi.CloseableChan - ( CloseableChan - -- * IO function versions - , newOpenChan - , newClosedChan - , writeChan - , readChan - , closeChan - , openChan - , emptyChan - -- * STM function versions - , newOpenChanSTM - , newClosedChanSTM - , writeChanSTM - , readChanSTM - , closeChanSTM - , openChanSTM - , emptyChanSTM - ) where - -import Control.Concurrent.STM - --- | A 'Chan' that can be closed and opened again. --- --- Attempts to write to or read from a 'CloseableChan' while it is closed result --- in a @Nothing@. -data CloseableChan a = CloseableChan - { cClosed :: TVar Bool - , cChan :: TChan (Content a) - } - --- TODO: Replace with Maybe? -data Content a = Value a - | End - -{- - - Functions as STM actions - -} - --- | See 'newOpenChan'. -newOpenChanSTM :: STM (CloseableChan a) -newOpenChanSTM = do - cClosed <- newTVar False - cChan <- newTChan - return CloseableChan{..} - --- | See 'newClosedChan'. -newClosedChanSTM :: STM (CloseableChan a) -newClosedChanSTM = do - cClosed <- newTVar True - cChan <- newTChan - return CloseableChan{..} - --- | See 'writeChan'. -writeChanSTM :: CloseableChan a -> a -> STM (Maybe ()) -writeChanSTM CloseableChan{..} a = do - closed <- readTVar cClosed - if closed - then return Nothing - else Just <$> writeTChan cChan (Value a) - --- | See 'readChan'. -readChanSTM :: CloseableChan a -> STM (Maybe a) -readChanSTM CloseableChan{..} = do - closed <- readTVar cClosed - if closed - then return Nothing - else Just <$> readValue - where - readValue = do - val <- readTChan cChan - case val of - End -> readValue -- ignore End while reading normally - Value v -> return v - --- | See 'closeChan'. -closeChanSTM :: CloseableChan a -> STM () -closeChanSTM CloseableChan{..} = writeTVar cClosed True - --writeTChan cChan End - --- | See 'openChan'. -openChanSTM :: CloseableChan a -> STM () -openChanSTM CloseableChan{..} = writeTVar cClosed False - --- | See 'emptyChan'. -emptyChanSTM :: CloseableChan a -> STM [a] -emptyChanSTM CloseableChan{..} = do - writeTChan cChan End - extractValues - where - extractValues = do - val <- readTChan cChan - case val of - End -> return [] - Value v -> (v :) <$> extractValues - -{- - - Functions as IO actions - -} - --- | Create a new open 'CloseableChan'. -newOpenChan :: IO (CloseableChan a) -newOpenChan = atomically newOpenChanSTM - --- | Create a new closed 'CloseableChan'. -newClosedChan :: IO (CloseableChan a) -newClosedChan = atomically newClosedChanSTM - --- | Attempt to write a value into the 'CloseableChan'. --- --- If the chan is open, succeeds with a @Just ()@. --- If the chan is closed, fails with a @Nothing@. -writeChan :: CloseableChan a -> a -> IO (Maybe ()) -writeChan chan a = atomically $ writeChanSTM chan a - --- | Attempt to read a value @v@ from the 'CloseableChan'. --- --- If the chan is open, succeeds with a @Just v@. --- If the chan is closed, fails with a @Nothing@. -readChan :: CloseableChan a -> IO (Maybe a) -readChan = atomically . readChanSTM - --- | Close a 'CloseableChan'. --- Does nothing if chan is already closed. --- --- Performing this action un-blocks all calls to 'readChan'. -closeChan :: CloseableChan a -> IO () -closeChan = atomically . closeChanSTM - --- | Open a 'CloseableChan'. --- Does nothing if chan is already open. -openChan :: CloseableChan a -> IO () -openChan = atomically . openChanSTM - --- | Remove all items currently in the 'CloseableChan' and returns them in a list. --- --- This function also works while the chan is closed. --- It is meant as a way to clean up the remaining values in a chan after it was closed. -emptyChan :: CloseableChan a -> IO [a] -emptyChan = atomically . emptyChanSTM diff --git a/src/EuphApi/Threads.hs b/src/EuphApi/Threads.hs index 9339833..7021b1f 100644 --- a/src/EuphApi/Threads.hs +++ b/src/EuphApi/Threads.hs @@ -5,82 +5,28 @@ -- | Setup consisting of a few threads to send and receive packets to and from -- the euphoria api using a websocket connection. --- --- @ --- m: main thread --- r: recvThread --- f: fetchThread --- s: sendThread --- --- On creation: --- m: Create WS connection (or do this in r?) --- m: Create channels --- m: Start recvThread with all necessary info --- r: Start fetchThread and sendThread using async --- m: Return SendChan and EventChan --- --- On disconnect: --- s: close connection (optional) --- f: detect exception --- f: RDisconnected -> RecvChan --- f: *stops* --- r: RecvChan -> RDisconnected --- r: EDisconnected -> EventChan --- r: close SendChan --- s: *stops* --- r: wait for f and s to stop --- r: clean up SendChan --- r: clean up RecvChan --- r: clean up response list --- r: EStopped -> EventChan --- r: *stops* --- -> All MVars are dealt with --- --- ↓ --- │ --- (SendChan) --- │ --- ┌─────────────────────╴│╶──────┐ --- │ │ │ --- │ (WS.Connection) │ │ --- │ │ │ │ --- │ [fetchThread] [sendThread] │ --- │ │ │ │ --- │ └──────┬──────┘ │ --- │ │ │ --- │ (RecvChan) │ --- │ │ │ --- │ [recvThread] │ --- │ │ │ --- └──────────────╴│╶─────────────┘ --- │ --- (EventChan) --- │ --- ↓ --- @ module EuphApi.Threads ( -- * Connecting to euphoria - EuphConnection - , euphClient - , getEvents + Connection + , euphApp + , getEvent -- * API functions , pingReply , nick , send - -- * Events and replies + -- * Exception , EuphException(..) - , EuphEvent(..) ) where import Control.Applicative -import Control.Concurrent import Control.Exception import Control.Monad import Control.Monad.IO.Class -import Data.Maybe +import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.STM import Control.Monad.Trans.State import Data.Aeson as A import qualified Data.ByteString.Lazy as BS @@ -91,14 +37,251 @@ import Data.Time import Data.Time.Clock.POSIX import qualified Network.WebSockets as WS -import qualified EuphApi.CloseableChan as E import qualified EuphApi.Types as E --- Some useful type aliases type PacketID = T.Text type Reply = Either EuphException data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r)) +type SendQueue = TBQueue Send +type RecvQueue = TBQueue Recv +type EventQueue = TBQueue (Maybe Event) -- 'Nothing' ends the event stream +type LockedFlag = TVar Bool + +data Connection = Connection LockedFlag SendQueue EventQueue + +-- | Read one 'Event' from the 'Connection'. +-- +-- Returns 'Nothing' once when the 'Connection' stops. +-- After that, any further calls of getEvent on the same connection +-- will block indefinitely. +getEvent :: Connection -> IO (Maybe Event) +getEvent (Connection locked _ qEvent) = undefined locked qEvent + +-- | A @Network.Websockets.'WS.ClientApp'@ creating a 'Connection'. +euphApp :: WS.ClientApp Connection +euphApp con = do + sendQueue <- atomically $ newTBQueue 10 + recvQueue <- atomically $ newTBQueue 10 + eventQueue <- atomically $ newTBQueue 10 + locked <- atomically $ newTVar False + let euphCon = Connection locked sendQueue eventQueue + void $ forkIO $ recvThread euphCon recvQueue con + return euphCon + +{- + - Fetch thread + -} + +fetchThread :: RecvQueue -> WS.Connection -> IO () +fetchThread qRecv con = void $ handle handleException $ forever $ do + message <- WS.receiveData con + void $ atomically $ writeTBQueue qRecv (RPacket message) + where + handleException (WS.CloseRequest _ _) = atomically $ writeTBQueue qRecv RDisconnected + handleException WS.ConnectionClosed = atomically $ writeTBQueue qRecv RDisconnected + handleException _ = fetchThread qRecv con + +{- + - Send thread + -} + +-- Prepare a single packet for sending. +-- Doesn't actually do any IO. The IO monad part is left in for ease of use. +preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID) +preparePacket packetType packetData = do + packetNr <- get + put $ packetNr + 1 + let packetID = T.pack $ show packetNr + bytestr = encode . Object . HM.fromList $ + [ ("id", A.String packetID) + , ("type", A.String packetType) + , ("data", toJSON packetData) + ] + return (bytestr, packetID) + +readWhileOpen :: Connection -> STM (Maybe Send) +readWhileOpen (Connection locked qSend _) = do + isLocked <- readTVar locked + if isLocked + then return Nothing + else Just <$> readTBQueue qSend + +sendThread :: Connection -> RecvQueue -> WS.Connection -> StateT Integer IO () +sendThread euphCon qRecv con = do + item <- liftIO $ atomically $ readWhileOpen euphCon + case item of + Nothing -> + return () + + Just SDisconnect -> + liftIO $ WS.sendClose con ("Bye." :: T.Text) + + Just (SNoReply packetType packetData) -> do + (packet, _) <- preparePacket packetType packetData + liftIO $ WS.sendTextData con packet + continue <- liftIO $ sendSafely packet + when continue $ + sendThread euphCon qRecv con + + Just (SReply packetType packetData reply) -> do + (packet, packetID) <- preparePacket packetType packetData + void $ liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply) + continue <- liftIO $ sendSafely packet + when continue $ + sendThread euphCon qRecv con + where + sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException + handleException (WS.CloseRequest _ _) = return False + handleException WS.ConnectionClosed = return False + handleException _ = return True + +{- + - Receive thread + -} + +data PacketInfo = PacketInfo + { infoPacketID :: Maybe PacketID + , infoServerError :: Maybe T.Text + } deriving (Show) + +instance FromJSON PacketInfo where + parseJSON = withObject "packet" $ \o -> do + infoPacketID <- o .:? "id" + infoServerError <- o .:? "error" + return PacketInfo{..} + +-- TODO: Swap for HashMap? +type Awaiting = M.Map T.Text ReplyMVar + +whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m () +whenJust m f = maybe (return ()) f m + +processPacket :: EventQueue -> BS.ByteString -> StateT Awaiting IO () +processPacket qEvent bs = do + -- First, deal with event channel events. + case A.decode bs of + Nothing -> return () + Just event -> liftIO $ atomically $ writeTBQueue qEvent (Just event) + -- Then, deal with replies. + -- Find out whether there is actually any dealing with replies to do... + replies <- get + let result = do -- Maybe monad + PacketInfo{..} <- A.decode bs + replyID <- infoPacketID + replyMVar <- M.lookup replyID replies + return (replyID, replyMVar, infoServerError) + -- ... and then write the appropriate result into the MVar. + whenJust result $ \(replyID, ReplyMVar var, serverError) -> do + modify (M.delete replyID) + case serverError of + Just e -> liftIO $ putMVar var (Left (EuphServerError e)) + Nothing -> + case A.decode bs of + Nothing -> liftIO $ putMVar var (Left EuphParse) + Just r -> liftIO $ putMVar var (Right r) + +processRecv :: RecvQueue -> EventQueue -> StateT Awaiting IO () +processRecv qRecv qEvent = do + recv <- liftIO $ atomically $ readTBQueue qRecv + case recv of + RReply packetID replyMVar -> do + modify (M.insert packetID replyMVar) + processRecv qRecv qEvent + + RPacket bs -> do + processPacket qEvent bs + processRecv qRecv qEvent + + RDisconnected -> return () + +cleanupWaiting :: Awaiting -> IO () +cleanupWaiting replies = + forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected) + +emptyTBQueue :: TBQueue a -> STM [a] +emptyTBQueue q = do + isEmpty <- isEmptyTBQueue q + if isEmpty + then return [] + else do + item <- readTBQueue q + rest <- emptyTBQueue q + return $ item : rest + +cleanupSend :: SendQueue -> IO () +cleanupSend qSend = do + sends <- atomically $ emptyTBQueue qSend + forM_ sends $ \case + SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) + _ -> return () + +cleanupRecv :: RecvQueue -> IO () +cleanupRecv qRecv = do + recvs <- atomically $ emptyTBQueue qRecv + forM_ recvs $ \case + RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) + _ -> return () + +recvThread :: Connection -> RecvQueue -> WS.Connection -> IO () +recvThread euphCon@(Connection locked qSend qEvent) qRecv con = do + tFetch <- async $ fetchThread qRecv con + tSend <- async $ evalStateT (sendThread euphCon qRecv con) 0 + waitingReplies <- execStateT (processRecv qRecv qEvent) M.empty + atomically $ writeTVar locked True + wait tFetch + wait tSend + cleanupWaiting waitingReplies + cleanupSend qSend + cleanupRecv qRecv + atomically $ writeTBQueue qEvent Nothing + +{- + - API functions + -} + +sendPacket :: (ToJSON p, FromJSON r) => Connection -> T.Text -> p -> IO r +sendPacket (Connection locked qSend _) packetType packetData = do + var <- newEmptyMVar + let packet = SReply packetType packetData (ReplyMVar var) + atomically $ do + isLocked <- readTVar locked + if isLocked + then throwSTM EuphClosed + else writeTBQueue qSend packet + result <- readMVar var + case result of + Left f -> throw f + Right r -> return r + +sendPacketNoReply :: (ToJSON p) => Connection -> T.Text -> p -> IO () +sendPacketNoReply (Connection locked qSend _) packetType packetData = atomically $ do + let packet = SNoReply packetType packetData + isLocked <- readTVar locked + if isLocked + then throwSTM EuphClosed + else writeTBQueue qSend packet + +pingReply :: Connection -> UTCTime -> IO () +pingReply euphCon pingReplyCommandTime = + sendPacketNoReply euphCon "ping-reply" PingReplyCommand{..} + +nick :: Connection -> T.Text -> IO (E.Nick, E.Nick) +nick euphCon nickCommandName = do + NickReply{..} <- sendPacket euphCon "nick" NickCommand{..} + return (nickReplyFrom, nickReplyTo) + +send :: Connection -> Maybe E.Snowflake -> T.Text -> IO E.Message +send euphCon sendCommandParent sendCommandContent = do + SendReply{..} <- sendPacket euphCon "send" SendCommand{..} + return sendReplyMessage + + + +{- + - Some types + -} + -- | The ways in which getting a reply from the server can fail. -- -- An EuphException may be raised by any function in the API functions section. @@ -121,51 +304,122 @@ instance Show EuphException where instance Exception EuphException -sendPacket :: (ToJSON p, FromJSON r) => EuphConnection -> T.Text -> p -> IO r -sendPacket (EuphConnection chan _) packetType packetData = do - var <- newEmptyMVar - let packet = SReply packetType packetData (ReplyMVar var) - done <- E.writeChan chan packet - case done of - Nothing -> throw EuphClosed - Just () -> do - result <- readMVar var - case result of - Left f -> throw f - Right r -> return r +data Recv = RDisconnected + | RPacket BS.ByteString + | RReply PacketID ReplyMVar -sendPacketNoReply :: (ToJSON p) => EuphConnection -> T.Text -> p -> IO () -sendPacketNoReply (EuphConnection chan _) packetType packetData = do - let packet = SNoReply packetType packetData - done <- E.writeChan chan packet - case done of - Nothing -> throw EuphClosed - Just () -> return () +data Send = SDisconnect + | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents + | forall p . (ToJSON p) => SReply T.Text p ReplyMVar -{- - - API functions - -} +data Event + = BounceEvent (Maybe T.Text) (Maybe [T.Text]) + -- ^ A 'BounceEvent' indicates that access to a room is denied. + -- + -- @'BounceEvent' (Maybe reason) (Maybe [authOption])@ + | DisconnectEvent T.Text + -- ^ A 'DisconnectEvent' indicates that the session is being closed. + -- The client will subsequently be disconnected. + -- + -- If the disconnect reason is "authentication changed", + -- the client should immediately reconnect. + -- + -- @'DisconnectEvent' reason@ + | HelloEvent E.SessionView Bool T.Text + -- ^ A 'HelloEvent' is sent by the server to the client + -- when a session is started. + -- It includes information about the client's authentication + -- and associated identity. + -- + -- @'HelloEvent' session roomIsPrivate version@ + | JoinEvent E.SessionView + -- ^ A 'JoinEvent' indicates a session just joined the room. + -- + -- @'JoinEvent' session@ + | NetworkEvent T.Text T.Text + -- ^ A 'NetworkEvent' indicates some server-side event + -- that impacts the presence of sessions in a room. + -- + -- If the network event type is "partition", + -- then this should be treated as a 'PartEvent' for all sessions + -- connected to the same server id/era combo. + -- + -- @'NetworkEvent' server_id server_era@ + | NickEvent E.Nick E.Nick + -- ^ A 'NickEvent' announces a nick change by another session in the room. + -- + -- @'NickEvent' from to@ + | EditMessageEvent E.Message + -- ^ An 'EditMessageEvent' indicates that a message in the room + -- has been modified or deleted. + -- If the client offers a user interface and the indicated message + -- is currently displayed, it should update its display accordingly. + -- + -- The event packet includes a snapshot of the message post-edit. + -- + -- @'EditMessageEvent' editedMessage@ + | PartEvent E.SessionView + -- ^ A 'PartEvent' indicates a session just disconnected from the room. + -- + -- @'PartEvent' session@ + | PingEvent UTCTime UTCTime + -- ^ A 'PingEvent' represents a server-to-client ping. + -- The client should send back a 'pingReply' with the same value + -- for the time field as soon as possible (or risk disconnection). + -- + -- @'PingEvent' time next@ + | SendEvent E.Message + -- ^ A 'SendEvent' indicates a message received by the room + -- from another session. + -- + -- @'SendEvent' message@ + | SnapshotEvent T.Text [E.SessionView] [E.Message] (Maybe E.Nick) + -- ^ A 'SnapshotEvent' indicates that a session has + -- successfully joined a room. + -- It also offers a snapshot of the room’s state and recent history. + -- + -- @'SnapshotEvent' version listing log (Maybe nick)@ --- TODO: Add proper documentation -pingReply :: EuphConnection -> UTCTime -> IO () -pingReply econ pingReplyCommandTime = - sendPacketNoReply econ "ping-reply" PingReplyCommand{..} + -- LoginEvent -- not implemented + -- LogoutEvent -- not implemented + -- PMInitiateEvent -- not implemented --- TODO: Add proper documentation -nick :: EuphConnection -> T.Text -> IO (E.Nick, E.Nick) -nick econ nickCommandName = do - NickReply{..} <- sendPacket econ "nick" NickCommand{..} - return (nickReplyFrom, nickReplyTo) - --- TODO: Add proper documentation -send :: EuphConnection -> Maybe E.Snowflake -> T.Text -> IO E.Message -send econ sendCommandParent sendCommandContent = do - SendReply{..} <- sendPacket econ "send" SendCommand{..} - return sendReplyMessage - -{- - - Commands and replies - -} +instance FromJSON Event where + parseJSON = withObject "Event" $ \o -> do + tp <- o .: "type" + dt <- o .: "data" + empty + <|> (tp `is` "bounce-event" >> pBounceEvent dt) + <|> (tp `is` "disconnect-event" >> pDisconnectEvent dt) + <|> (tp `is` "hello-event" >> pHelloEvent dt) + <|> (tp `is` "join-event" >> pJoinEvent dt) + <|> (tp `is` "network-event" >> pNetworkEvent dt) + <|> (tp `is` "nick-event" >> pNickEvent dt) + <|> (tp `is` "edit-message-event" >> pEditMessageEvent dt) + <|> (tp `is` "part-event" >> pPartEvent dt) + <|> (tp `is` "ping-event" >> pPingEvent dt) + <|> (tp `is` "send-event" >> pSendEvent dt) + <|> (tp `is` "snapshot-event" >> pSnapshotEvent dt) + where + a `is` b = guard ((a :: T.Text) == b) + pBounceEvent = withObject "BounceEvent" $ \o -> + BounceEvent <$> o .:? "reason" <*> o .:? "auth_options" + pDisconnectEvent = withObject "DisconnectEvent" $ \o -> + DisconnectEvent <$> o .: "reason" + pHelloEvent = withObject "HelloEvent" $ \o -> + HelloEvent <$> o .: "session" <*> o .: "room_is_private" <*> o .: "version" + pJoinEvent v = JoinEvent <$> parseJSON v + pNetworkEvent = withObject "NetworkEvent" $ \o -> + NetworkEvent <$> o .: "server_id" <*> o .: "server_era" + pNickEvent = withObject "NickEvent" $ \o -> + NickEvent <$> o .: "from" <*> o .: "to" + pEditMessageEvent v = EditMessageEvent <$> parseJSON v + pPartEvent v = PartEvent <$> parseJSON v + pPingEvent = withObject "PingEvent" $ \o -> + PingEvent <$> o .: "time" <*> o .: "next" + pSendEvent v = SendEvent <$> parseJSON v + pSnapshotEvent = withObject "SnapshotEvent" $ \o -> + SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick" (.?=) :: (ToJSON v, KeyValue kv) => T.Text -> Maybe v -> [kv] k .?= (Just v) = [k .= v] @@ -223,298 +477,3 @@ newtype SendReply = SendReply instance FromJSON SendReply where parseJSON v = SendReply <$> parseJSON v - -{- - - Events - -} - --- | Represents . --- --- These events may be sent from the server to the client at any time. -data EuphEvent = BounceEvent (Maybe T.Text) (Maybe [T.Text]) - -- ^ A 'BounceEvent' indicates that access to a room is denied. - -- - -- @'BounceEvent' (Maybe reason) (Maybe [authOption])@ - | DisconnectEvent T.Text - -- ^ A 'DisconnectEvent' indicates that the session is being closed. - -- The client will subsequently be disconnected. - -- - -- If the disconnect reason is "authentication changed", - -- the client should immediately reconnect. - -- - -- @'DisconnectEvent' reason@ - | HelloEvent E.SessionView Bool T.Text - -- ^ A 'HelloEvent' is sent by the server to the client - -- when a session is started. - -- It includes information about the client's authentication - -- and associated identity. - -- - -- @'HelloEvent' session roomIsPrivate version@ - | JoinEvent E.SessionView - -- ^ A 'JoinEvent' indicates a session just joined the room. - -- - -- @'JoinEvent' session@ - | NetworkEvent T.Text T.Text - -- ^ A 'NetworkEvent' indicates some server-side event - -- that impacts the presence of sessions in a room. - -- - -- If the network event type is "partition", - -- then this should be treated as a 'PartEvent' for all sessions - -- connected to the same server id/era combo. - -- - -- @'NetworkEvent' server_id server_era@ - | NickEvent E.Nick E.Nick - -- ^ A 'NickEvent' announces a nick change by another session in the room. - -- - -- @'NickEvent' from to@ - | EditMessageEvent E.Message - -- ^ An 'EditMessageEvent' indicates that a message in the room - -- has been modified or deleted. - -- If the client offers a user interface and the indicated message - -- is currently displayed, it should update its display accordingly. - -- - -- The event packet includes a snapshot of the message post-edit. - -- - -- @'EditMessageEvent' editedMessage@ - | PartEvent E.SessionView - -- ^ A 'PartEvent' indicates a session just disconnected from the room. - -- - -- @'PartEvent' session@ - | PingEvent UTCTime UTCTime - -- ^ A 'PingEvent' represents a server-to-client ping. - -- The client should send back a 'pingReply' with the same value - -- for the time field as soon as possible (or risk disconnection). - -- - -- @'PingEvent' time next@ - | SendEvent E.Message - -- ^ A 'SendEvent' indicates a message received by the room - -- from another session. - -- - -- @'SendEvent' message@ - | SnapshotEvent T.Text [E.SessionView] [E.Message] (Maybe E.Nick) - -- ^ A 'SnapshotEvent' indicates that a session has - -- successfully joined a room. - -- It also offers a snapshot of the room’s state and recent history. - -- - -- @'SnapshotEvent' version listing log (Maybe nick)@ - - -- LoginEvent -- not implemented - -- LogoutEvent -- not implemented - -- PMInitiateEvent -- not implemented - -instance FromJSON EuphEvent where - parseJSON = withObject "Event" $ \o -> do - tp <- o .: "type" - dt <- o .: "data" - empty - <|> (tp `is` "bounce-event" >> pBounceEvent dt) - <|> (tp `is` "disconnect-event" >> pDisconnectEvent dt) - <|> (tp `is` "hello-event" >> pHelloEvent dt) - <|> (tp `is` "join-event" >> pJoinEvent dt) - <|> (tp `is` "network-event" >> pNetworkEvent dt) - <|> (tp `is` "nick-event" >> pNickEvent dt) - <|> (tp `is` "edit-message-event" >> pEditMessageEvent dt) - <|> (tp `is` "part-event" >> pPartEvent dt) - <|> (tp `is` "ping-event" >> pPingEvent dt) - <|> (tp `is` "send-event" >> pSendEvent dt) - <|> (tp `is` "snapshot-event" >> pSnapshotEvent dt) - where - a `is` b = guard ((a :: T.Text) == b) - pBounceEvent = withObject "BounceEvent" $ \o -> - BounceEvent <$> o .:? "reason" <*> o .:? "auth_options" - pDisconnectEvent = withObject "DisconnectEvent" $ \o -> - DisconnectEvent <$> o .: "reason" - pHelloEvent = withObject "HelloEvent" $ \o -> - HelloEvent <$> o .: "session" <*> o .: "room_is_private" <*> o .: "version" - pJoinEvent v = JoinEvent <$> parseJSON v - pNetworkEvent = withObject "NetworkEvent" $ \o -> - NetworkEvent <$> o .: "server_id" <*> o .: "server_era" - pNickEvent = withObject "NickEvent" $ \o -> - NickEvent <$> o .: "from" <*> o .: "to" - pEditMessageEvent v = EditMessageEvent <$> parseJSON v - pPartEvent v = PartEvent <$> parseJSON v - pPingEvent = withObject "PingEvent" $ \o -> - PingEvent <$> o .: "time" <*> o .: "next" - pSendEvent v = SendEvent <$> parseJSON v - pSnapshotEvent = withObject "SnapshotEvent" $ \o -> - SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick" - -{- - - Channels - -} - -type RecvChan = E.CloseableChan Recv -data Recv = RDisconnected - | RPacket BS.ByteString - | RReply PacketID ReplyMVar - -type SendChan = E.CloseableChan Send -data Send = SDisconnect - | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents - | forall p . (ToJSON p) => SReply T.Text p ReplyMVar - -type EventChan = Chan Event -type Event = Maybe EuphEvent - -{- - - Fetch thread - -} - -fetchThread :: RecvChan -> WS.Connection -> IO () -fetchThread cRecv con = handle handleException $ forever $ do - message <- WS.receiveData con - void $ E.writeChan cRecv (RPacket message) -- will never be closed while thread running - where - handleException (WS.CloseRequest _ _) = void $ E.writeChan cRecv RDisconnected - handleException WS.ConnectionClosed = void $ E.writeChan cRecv RDisconnected - handleException _ = fetchThread cRecv con - -{- - - Send thread --} - --- Prepare a single packet for sending -preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID) -preparePacket packetType packetData = do - packetNr <- get - put $ packetNr + 1 - let packetID = T.pack $ show packetNr - bytestr = encode . Object . HM.fromList $ - [ ("id", A.String packetID) - , ("type", A.String packetType) - , ("data", toJSON packetData) - ] - return (bytestr, packetID) - -sendThread :: SendChan -> RecvChan -> WS.Connection -> StateT Integer IO () -sendThread cSend cRecv con = do - item <- liftIO $ E.readChan cSend - case item of - Nothing -> - return () - - Just SDisconnect -> - liftIO $ WS.sendClose con ("Bye." :: T.Text) - - Just (SNoReply packetType packetData) -> do - (packet, _) <- preparePacket packetType packetData - liftIO $ WS.sendTextData con packet - continue <- liftIO $ sendSafely packet - when continue $ - sendThread cSend cRecv con - - Just (SReply packetType packetData reply) -> do - (packet, packetID) <- preparePacket packetType packetData - void $ liftIO $ E.writeChan cRecv (RReply packetID reply) - continue <- liftIO $ sendSafely packet - when continue $ - sendThread cSend cRecv con - where - sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException - handleException (WS.CloseRequest _ _) = return False - handleException WS.ConnectionClosed = return False - handleException _ = return True - -{- - - RecvThread - -} - -data PacketInfo = PacketInfo - { infoPacketID :: Maybe PacketID - , infoServerError :: Maybe T.Text - } deriving (Show) - -instance FromJSON PacketInfo where - parseJSON = withObject "packet" $ \o -> do - infoPacketID <- o .:? "id" - infoServerError <- o .:? "error" - return PacketInfo{..} - --- TODO: Swap for HashMap? -type Awaiting = M.Map T.Text ReplyMVar - -processRecv :: RecvChan -> EventChan -> Awaiting -> IO Awaiting -processRecv cRecv cEvent replies = do - recv <- E.readChan cRecv - case recv of - Just (RReply packetID replyMVar) -> do - let newReplies = M.insert packetID replyMVar replies - processRecv cRecv cEvent newReplies - - Just (RPacket bs) -> do - newReplies <- processPacket cEvent bs replies - processRecv cRecv cEvent newReplies - - _ -> return replies - -processPacket :: EventChan -> BS.ByteString -> Awaiting -> IO Awaiting -processPacket cEvent bs replies = do - -- First, deal with event channel events. - case A.decode bs of - Nothing -> return () - Just event -> writeChan cEvent (Just event) - -- Then, deal with replies. - fromMaybe (return replies) $ do - PacketInfo{..} <- A.decode bs - replyID <- infoPacketID - (ReplyMVar var) <- M.lookup replyID replies - let newReplies = M.delete replyID replies - case infoServerError of - Nothing -> do - reply <- A.decode bs - return $ newReplies <$ putMVar var (Right reply) - Just e -> - return $ newReplies <$ putMVar var (Left (EuphServerError e)) - -cleanupWaiting :: Awaiting -> IO () -cleanupWaiting replies = - forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected) - -cleanupSend :: SendChan -> IO () -cleanupSend cSend = do - sends <- E.emptyChan cSend - forM_ sends $ \case - SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) - _ -> return () - -cleanupRecv :: RecvChan -> IO () -cleanupRecv cRecv = do - recvs <- E.emptyChan cRecv - forM_ recvs $ \case - RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected) - _ -> return () - -recvThread :: SendChan -> RecvChan -> EventChan -> WS.Connection -> IO () -recvThread cSend cRecv cEvent con = do - tFetch <- async $ fetchThread cRecv con - tSend <- async $ evalStateT (sendThread cSend cRecv con) 0 - waitingReplies <- processRecv cRecv cEvent M.empty - E.closeChan cSend - wait tFetch - wait tSend - cleanupWaiting waitingReplies - cleanupSend cSend - cleanupRecv cRecv - writeChan cEvent Nothing - -{- - - Startup - -} - --- TODO: Add proper documentation -data EuphConnection = EuphConnection SendChan EventChan - --- TODO: Add proper documentation -getEvents :: EuphConnection -> IO [EuphEvent] -getEvents (EuphConnection _ cEvent) = do - events <- getChanContents cEvent - return $ catMaybes $ takeWhile isJust events - --- TODO: Add proper documentation -euphClient :: WS.ClientApp EuphConnection -euphClient con = do - sendChan <- E.newOpenChan - recvChan <- E.newOpenChan - eventChan <- newChan - void $ forkIO $ recvThread sendChan recvChan eventChan con - return $ EuphConnection sendChan eventChan