Rewrite Connection to not depend on CloseableChan

This commit is contained in:
Joscha 2018-02-05 19:31:00 +00:00
parent 26d08b7312
commit 102010f442
2 changed files with 357 additions and 548 deletions

View file

@ -1,150 +0,0 @@
{-# LANGUAGE RecordWildCards #-}
-- | Chans that can be closed and reopened.
--
-- While a 'CloseableChan' is closed, it can not be written to or read from.
-- Calls to 'writeChan' and 'readChan' are non-blocking while a chan is closed.
--
-- If a thread is attempting to read from a chan using 'readChan' and that chan is closed,
-- the call to 'readChan' resumes and @Nothing@ is returned.
module EuphApi.CloseableChan
( CloseableChan
-- * IO function versions
, newOpenChan
, newClosedChan
, writeChan
, readChan
, closeChan
, openChan
, emptyChan
-- * STM function versions
, newOpenChanSTM
, newClosedChanSTM
, writeChanSTM
, readChanSTM
, closeChanSTM
, openChanSTM
, emptyChanSTM
) where
import Control.Concurrent.STM
-- | A 'Chan' that can be closed and opened again.
--
-- Attempts to write to or read from a 'CloseableChan' while it is closed result
-- in a @Nothing@.
data CloseableChan a = CloseableChan
{ cClosed :: TVar Bool
, cChan :: TChan (Content a)
}
-- TODO: Replace with Maybe?
data Content a = Value a
| End
{-
- Functions as STM actions
-}
-- | See 'newOpenChan'.
newOpenChanSTM :: STM (CloseableChan a)
newOpenChanSTM = do
cClosed <- newTVar False
cChan <- newTChan
return CloseableChan{..}
-- | See 'newClosedChan'.
newClosedChanSTM :: STM (CloseableChan a)
newClosedChanSTM = do
cClosed <- newTVar True
cChan <- newTChan
return CloseableChan{..}
-- | See 'writeChan'.
writeChanSTM :: CloseableChan a -> a -> STM (Maybe ())
writeChanSTM CloseableChan{..} a = do
closed <- readTVar cClosed
if closed
then return Nothing
else Just <$> writeTChan cChan (Value a)
-- | See 'readChan'.
readChanSTM :: CloseableChan a -> STM (Maybe a)
readChanSTM CloseableChan{..} = do
closed <- readTVar cClosed
if closed
then return Nothing
else Just <$> readValue
where
readValue = do
val <- readTChan cChan
case val of
End -> readValue -- ignore End while reading normally
Value v -> return v
-- | See 'closeChan'.
closeChanSTM :: CloseableChan a -> STM ()
closeChanSTM CloseableChan{..} = writeTVar cClosed True
--writeTChan cChan End
-- | See 'openChan'.
openChanSTM :: CloseableChan a -> STM ()
openChanSTM CloseableChan{..} = writeTVar cClosed False
-- | See 'emptyChan'.
emptyChanSTM :: CloseableChan a -> STM [a]
emptyChanSTM CloseableChan{..} = do
writeTChan cChan End
extractValues
where
extractValues = do
val <- readTChan cChan
case val of
End -> return []
Value v -> (v :) <$> extractValues
{-
- Functions as IO actions
-}
-- | Create a new open 'CloseableChan'.
newOpenChan :: IO (CloseableChan a)
newOpenChan = atomically newOpenChanSTM
-- | Create a new closed 'CloseableChan'.
newClosedChan :: IO (CloseableChan a)
newClosedChan = atomically newClosedChanSTM
-- | Attempt to write a value into the 'CloseableChan'.
--
-- If the chan is open, succeeds with a @Just ()@.
-- If the chan is closed, fails with a @Nothing@.
writeChan :: CloseableChan a -> a -> IO (Maybe ())
writeChan chan a = atomically $ writeChanSTM chan a
-- | Attempt to read a value @v@ from the 'CloseableChan'.
--
-- If the chan is open, succeeds with a @Just v@.
-- If the chan is closed, fails with a @Nothing@.
readChan :: CloseableChan a -> IO (Maybe a)
readChan = atomically . readChanSTM
-- | Close a 'CloseableChan'.
-- Does nothing if chan is already closed.
--
-- Performing this action un-blocks all calls to 'readChan'.
closeChan :: CloseableChan a -> IO ()
closeChan = atomically . closeChanSTM
-- | Open a 'CloseableChan'.
-- Does nothing if chan is already open.
openChan :: CloseableChan a -> IO ()
openChan = atomically . openChanSTM
-- | Remove all items currently in the 'CloseableChan' and returns them in a list.
--
-- This function also works while the chan is closed.
-- It is meant as a way to clean up the remaining values in a chan after it was closed.
emptyChan :: CloseableChan a -> IO [a]
emptyChan = atomically . emptyChanSTM

View file

@ -5,82 +5,28 @@
-- | Setup consisting of a few threads to send and receive packets to and from
-- the euphoria api using a websocket connection.
--
-- @
-- m: main thread
-- r: recvThread
-- f: fetchThread
-- s: sendThread
--
-- On creation:
-- m: Create WS connection (or do this in r?)
-- m: Create channels
-- m: Start recvThread with all necessary info
-- r: Start fetchThread and sendThread using async
-- m: Return SendChan and EventChan
--
-- On disconnect:
-- s: close connection (optional)
-- f: detect exception
-- f: RDisconnected -> RecvChan
-- f: *stops*
-- r: RecvChan -> RDisconnected
-- r: EDisconnected -> EventChan
-- r: close SendChan
-- s: *stops*
-- r: wait for f and s to stop
-- r: clean up SendChan
-- r: clean up RecvChan
-- r: clean up response list
-- r: EStopped -> EventChan
-- r: *stops*
-- -> All MVars are dealt with
--
-- ↓
-- │
-- (SendChan)
-- │
-- ┌─────────────────────╴│╶──────┐
-- │ │ │
-- │ (WS.Connection) │ │
-- │ │ │ │
-- │ [fetchThread] [sendThread] │
-- │ │ │ │
-- │ └──────┬──────┘ │
-- │ │ │
-- │ (RecvChan) │
-- │ │ │
-- │ [recvThread] │
-- │ │ │
-- └──────────────╴│╶─────────────┘
-- │
-- (EventChan)
-- │
-- ↓
-- @
module EuphApi.Threads (
-- * Connecting to euphoria
EuphConnection
, euphClient
, getEvents
Connection
, euphApp
, getEvent
-- * API functions
, pingReply
, nick
, send
-- * Events and replies
-- * Exception
, EuphException(..)
, EuphEvent(..)
) where
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Trans.State
import Data.Aeson as A
import qualified Data.ByteString.Lazy as BS
@ -91,14 +37,251 @@ import Data.Time
import Data.Time.Clock.POSIX
import qualified Network.WebSockets as WS
import qualified EuphApi.CloseableChan as E
import qualified EuphApi.Types as E
-- Some useful type aliases
type PacketID = T.Text
type Reply = Either EuphException
data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r))
type SendQueue = TBQueue Send
type RecvQueue = TBQueue Recv
type EventQueue = TBQueue (Maybe Event) -- 'Nothing' ends the event stream
type LockedFlag = TVar Bool
data Connection = Connection LockedFlag SendQueue EventQueue
-- | Read one 'Event' from the 'Connection'.
--
-- Returns 'Nothing' once when the 'Connection' stops.
-- After that, any further calls of getEvent on the same connection
-- will block indefinitely.
getEvent :: Connection -> IO (Maybe Event)
getEvent (Connection locked _ qEvent) = undefined locked qEvent
-- | A @Network.Websockets.'WS.ClientApp'@ creating a 'Connection'.
euphApp :: WS.ClientApp Connection
euphApp con = do
sendQueue <- atomically $ newTBQueue 10
recvQueue <- atomically $ newTBQueue 10
eventQueue <- atomically $ newTBQueue 10
locked <- atomically $ newTVar False
let euphCon = Connection locked sendQueue eventQueue
void $ forkIO $ recvThread euphCon recvQueue con
return euphCon
{-
- Fetch thread
-}
fetchThread :: RecvQueue -> WS.Connection -> IO ()
fetchThread qRecv con = void $ handle handleException $ forever $ do
message <- WS.receiveData con
void $ atomically $ writeTBQueue qRecv (RPacket message)
where
handleException (WS.CloseRequest _ _) = atomically $ writeTBQueue qRecv RDisconnected
handleException WS.ConnectionClosed = atomically $ writeTBQueue qRecv RDisconnected
handleException _ = fetchThread qRecv con
{-
- Send thread
-}
-- Prepare a single packet for sending.
-- Doesn't actually do any IO. The IO monad part is left in for ease of use.
preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID)
preparePacket packetType packetData = do
packetNr <- get
put $ packetNr + 1
let packetID = T.pack $ show packetNr
bytestr = encode . Object . HM.fromList $
[ ("id", A.String packetID)
, ("type", A.String packetType)
, ("data", toJSON packetData)
]
return (bytestr, packetID)
readWhileOpen :: Connection -> STM (Maybe Send)
readWhileOpen (Connection locked qSend _) = do
isLocked <- readTVar locked
if isLocked
then return Nothing
else Just <$> readTBQueue qSend
sendThread :: Connection -> RecvQueue -> WS.Connection -> StateT Integer IO ()
sendThread euphCon qRecv con = do
item <- liftIO $ atomically $ readWhileOpen euphCon
case item of
Nothing ->
return ()
Just SDisconnect ->
liftIO $ WS.sendClose con ("Bye." :: T.Text)
Just (SNoReply packetType packetData) -> do
(packet, _) <- preparePacket packetType packetData
liftIO $ WS.sendTextData con packet
continue <- liftIO $ sendSafely packet
when continue $
sendThread euphCon qRecv con
Just (SReply packetType packetData reply) -> do
(packet, packetID) <- preparePacket packetType packetData
void $ liftIO $ atomically $ writeTBQueue qRecv (RReply packetID reply)
continue <- liftIO $ sendSafely packet
when continue $
sendThread euphCon qRecv con
where
sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException
handleException (WS.CloseRequest _ _) = return False
handleException WS.ConnectionClosed = return False
handleException _ = return True
{-
- Receive thread
-}
data PacketInfo = PacketInfo
{ infoPacketID :: Maybe PacketID
, infoServerError :: Maybe T.Text
} deriving (Show)
instance FromJSON PacketInfo where
parseJSON = withObject "packet" $ \o -> do
infoPacketID <- o .:? "id"
infoServerError <- o .:? "error"
return PacketInfo{..}
-- TODO: Swap for HashMap?
type Awaiting = M.Map T.Text ReplyMVar
whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m ()
whenJust m f = maybe (return ()) f m
processPacket :: EventQueue -> BS.ByteString -> StateT Awaiting IO ()
processPacket qEvent bs = do
-- First, deal with event channel events.
case A.decode bs of
Nothing -> return ()
Just event -> liftIO $ atomically $ writeTBQueue qEvent (Just event)
-- Then, deal with replies.
-- Find out whether there is actually any dealing with replies to do...
replies <- get
let result = do -- Maybe monad
PacketInfo{..} <- A.decode bs
replyID <- infoPacketID
replyMVar <- M.lookup replyID replies
return (replyID, replyMVar, infoServerError)
-- ... and then write the appropriate result into the MVar.
whenJust result $ \(replyID, ReplyMVar var, serverError) -> do
modify (M.delete replyID)
case serverError of
Just e -> liftIO $ putMVar var (Left (EuphServerError e))
Nothing ->
case A.decode bs of
Nothing -> liftIO $ putMVar var (Left EuphParse)
Just r -> liftIO $ putMVar var (Right r)
processRecv :: RecvQueue -> EventQueue -> StateT Awaiting IO ()
processRecv qRecv qEvent = do
recv <- liftIO $ atomically $ readTBQueue qRecv
case recv of
RReply packetID replyMVar -> do
modify (M.insert packetID replyMVar)
processRecv qRecv qEvent
RPacket bs -> do
processPacket qEvent bs
processRecv qRecv qEvent
RDisconnected -> return ()
cleanupWaiting :: Awaiting -> IO ()
cleanupWaiting replies =
forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected)
emptyTBQueue :: TBQueue a -> STM [a]
emptyTBQueue q = do
isEmpty <- isEmptyTBQueue q
if isEmpty
then return []
else do
item <- readTBQueue q
rest <- emptyTBQueue q
return $ item : rest
cleanupSend :: SendQueue -> IO ()
cleanupSend qSend = do
sends <- atomically $ emptyTBQueue qSend
forM_ sends $ \case
SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected)
_ -> return ()
cleanupRecv :: RecvQueue -> IO ()
cleanupRecv qRecv = do
recvs <- atomically $ emptyTBQueue qRecv
forM_ recvs $ \case
RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected)
_ -> return ()
recvThread :: Connection -> RecvQueue -> WS.Connection -> IO ()
recvThread euphCon@(Connection locked qSend qEvent) qRecv con = do
tFetch <- async $ fetchThread qRecv con
tSend <- async $ evalStateT (sendThread euphCon qRecv con) 0
waitingReplies <- execStateT (processRecv qRecv qEvent) M.empty
atomically $ writeTVar locked True
wait tFetch
wait tSend
cleanupWaiting waitingReplies
cleanupSend qSend
cleanupRecv qRecv
atomically $ writeTBQueue qEvent Nothing
{-
- API functions
-}
sendPacket :: (ToJSON p, FromJSON r) => Connection -> T.Text -> p -> IO r
sendPacket (Connection locked qSend _) packetType packetData = do
var <- newEmptyMVar
let packet = SReply packetType packetData (ReplyMVar var)
atomically $ do
isLocked <- readTVar locked
if isLocked
then throwSTM EuphClosed
else writeTBQueue qSend packet
result <- readMVar var
case result of
Left f -> throw f
Right r -> return r
sendPacketNoReply :: (ToJSON p) => Connection -> T.Text -> p -> IO ()
sendPacketNoReply (Connection locked qSend _) packetType packetData = atomically $ do
let packet = SNoReply packetType packetData
isLocked <- readTVar locked
if isLocked
then throwSTM EuphClosed
else writeTBQueue qSend packet
pingReply :: Connection -> UTCTime -> IO ()
pingReply euphCon pingReplyCommandTime =
sendPacketNoReply euphCon "ping-reply" PingReplyCommand{..}
nick :: Connection -> T.Text -> IO (E.Nick, E.Nick)
nick euphCon nickCommandName = do
NickReply{..} <- sendPacket euphCon "nick" NickCommand{..}
return (nickReplyFrom, nickReplyTo)
send :: Connection -> Maybe E.Snowflake -> T.Text -> IO E.Message
send euphCon sendCommandParent sendCommandContent = do
SendReply{..} <- sendPacket euphCon "send" SendCommand{..}
return sendReplyMessage
{-
- Some types
-}
-- | The ways in which getting a reply from the server can fail.
--
-- An EuphException may be raised by any function in the API functions section.
@ -121,51 +304,122 @@ instance Show EuphException where
instance Exception EuphException
sendPacket :: (ToJSON p, FromJSON r) => EuphConnection -> T.Text -> p -> IO r
sendPacket (EuphConnection chan _) packetType packetData = do
var <- newEmptyMVar
let packet = SReply packetType packetData (ReplyMVar var)
done <- E.writeChan chan packet
case done of
Nothing -> throw EuphClosed
Just () -> do
result <- readMVar var
case result of
Left f -> throw f
Right r -> return r
data Recv = RDisconnected
| RPacket BS.ByteString
| RReply PacketID ReplyMVar
sendPacketNoReply :: (ToJSON p) => EuphConnection -> T.Text -> p -> IO ()
sendPacketNoReply (EuphConnection chan _) packetType packetData = do
let packet = SNoReply packetType packetData
done <- E.writeChan chan packet
case done of
Nothing -> throw EuphClosed
Just () -> return ()
data Send = SDisconnect
| forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents
| forall p . (ToJSON p) => SReply T.Text p ReplyMVar
{-
- API functions
-}
data Event
= BounceEvent (Maybe T.Text) (Maybe [T.Text])
-- ^ A 'BounceEvent' indicates that access to a room is denied.
--
-- @'BounceEvent' (Maybe reason) (Maybe [authOption])@
| DisconnectEvent T.Text
-- ^ A 'DisconnectEvent' indicates that the session is being closed.
-- The client will subsequently be disconnected.
--
-- If the disconnect reason is "authentication changed",
-- the client should immediately reconnect.
--
-- @'DisconnectEvent' reason@
| HelloEvent E.SessionView Bool T.Text
-- ^ A 'HelloEvent' is sent by the server to the client
-- when a session is started.
-- It includes information about the client's authentication
-- and associated identity.
--
-- @'HelloEvent' session roomIsPrivate version@
| JoinEvent E.SessionView
-- ^ A 'JoinEvent' indicates a session just joined the room.
--
-- @'JoinEvent' session@
| NetworkEvent T.Text T.Text
-- ^ A 'NetworkEvent' indicates some server-side event
-- that impacts the presence of sessions in a room.
--
-- If the network event type is "partition",
-- then this should be treated as a 'PartEvent' for all sessions
-- connected to the same server id/era combo.
--
-- @'NetworkEvent' server_id server_era@
| NickEvent E.Nick E.Nick
-- ^ A 'NickEvent' announces a nick change by another session in the room.
--
-- @'NickEvent' from to@
| EditMessageEvent E.Message
-- ^ An 'EditMessageEvent' indicates that a message in the room
-- has been modified or deleted.
-- If the client offers a user interface and the indicated message
-- is currently displayed, it should update its display accordingly.
--
-- The event packet includes a snapshot of the message post-edit.
--
-- @'EditMessageEvent' editedMessage@
| PartEvent E.SessionView
-- ^ A 'PartEvent' indicates a session just disconnected from the room.
--
-- @'PartEvent' session@
| PingEvent UTCTime UTCTime
-- ^ A 'PingEvent' represents a server-to-client ping.
-- The client should send back a 'pingReply' with the same value
-- for the time field as soon as possible (or risk disconnection).
--
-- @'PingEvent' time next@
| SendEvent E.Message
-- ^ A 'SendEvent' indicates a message received by the room
-- from another session.
--
-- @'SendEvent' message@
| SnapshotEvent T.Text [E.SessionView] [E.Message] (Maybe E.Nick)
-- ^ A 'SnapshotEvent' indicates that a session has
-- successfully joined a room.
-- It also offers a snapshot of the rooms state and recent history.
--
-- @'SnapshotEvent' version listing log (Maybe nick)@
-- TODO: Add proper documentation
pingReply :: EuphConnection -> UTCTime -> IO ()
pingReply econ pingReplyCommandTime =
sendPacketNoReply econ "ping-reply" PingReplyCommand{..}
-- LoginEvent -- not implemented
-- LogoutEvent -- not implemented
-- PMInitiateEvent -- not implemented
-- TODO: Add proper documentation
nick :: EuphConnection -> T.Text -> IO (E.Nick, E.Nick)
nick econ nickCommandName = do
NickReply{..} <- sendPacket econ "nick" NickCommand{..}
return (nickReplyFrom, nickReplyTo)
-- TODO: Add proper documentation
send :: EuphConnection -> Maybe E.Snowflake -> T.Text -> IO E.Message
send econ sendCommandParent sendCommandContent = do
SendReply{..} <- sendPacket econ "send" SendCommand{..}
return sendReplyMessage
{-
- Commands and replies
-}
instance FromJSON Event where
parseJSON = withObject "Event" $ \o -> do
tp <- o .: "type"
dt <- o .: "data"
empty
<|> (tp `is` "bounce-event" >> pBounceEvent dt)
<|> (tp `is` "disconnect-event" >> pDisconnectEvent dt)
<|> (tp `is` "hello-event" >> pHelloEvent dt)
<|> (tp `is` "join-event" >> pJoinEvent dt)
<|> (tp `is` "network-event" >> pNetworkEvent dt)
<|> (tp `is` "nick-event" >> pNickEvent dt)
<|> (tp `is` "edit-message-event" >> pEditMessageEvent dt)
<|> (tp `is` "part-event" >> pPartEvent dt)
<|> (tp `is` "ping-event" >> pPingEvent dt)
<|> (tp `is` "send-event" >> pSendEvent dt)
<|> (tp `is` "snapshot-event" >> pSnapshotEvent dt)
where
a `is` b = guard ((a :: T.Text) == b)
pBounceEvent = withObject "BounceEvent" $ \o ->
BounceEvent <$> o .:? "reason" <*> o .:? "auth_options"
pDisconnectEvent = withObject "DisconnectEvent" $ \o ->
DisconnectEvent <$> o .: "reason"
pHelloEvent = withObject "HelloEvent" $ \o ->
HelloEvent <$> o .: "session" <*> o .: "room_is_private" <*> o .: "version"
pJoinEvent v = JoinEvent <$> parseJSON v
pNetworkEvent = withObject "NetworkEvent" $ \o ->
NetworkEvent <$> o .: "server_id" <*> o .: "server_era"
pNickEvent = withObject "NickEvent" $ \o ->
NickEvent <$> o .: "from" <*> o .: "to"
pEditMessageEvent v = EditMessageEvent <$> parseJSON v
pPartEvent v = PartEvent <$> parseJSON v
pPingEvent = withObject "PingEvent" $ \o ->
PingEvent <$> o .: "time" <*> o .: "next"
pSendEvent v = SendEvent <$> parseJSON v
pSnapshotEvent = withObject "SnapshotEvent" $ \o ->
SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick"
(.?=) :: (ToJSON v, KeyValue kv) => T.Text -> Maybe v -> [kv]
k .?= (Just v) = [k .= v]
@ -223,298 +477,3 @@ newtype SendReply = SendReply
instance FromJSON SendReply where
parseJSON v = SendReply <$> parseJSON v
{-
- Events
-}
-- | Represents <http://api.euphoria.io/#asynchronous-events>.
--
-- These events may be sent from the server to the client at any time.
data EuphEvent = BounceEvent (Maybe T.Text) (Maybe [T.Text])
-- ^ A 'BounceEvent' indicates that access to a room is denied.
--
-- @'BounceEvent' (Maybe reason) (Maybe [authOption])@
| DisconnectEvent T.Text
-- ^ A 'DisconnectEvent' indicates that the session is being closed.
-- The client will subsequently be disconnected.
--
-- If the disconnect reason is "authentication changed",
-- the client should immediately reconnect.
--
-- @'DisconnectEvent' reason@
| HelloEvent E.SessionView Bool T.Text
-- ^ A 'HelloEvent' is sent by the server to the client
-- when a session is started.
-- It includes information about the client's authentication
-- and associated identity.
--
-- @'HelloEvent' session roomIsPrivate version@
| JoinEvent E.SessionView
-- ^ A 'JoinEvent' indicates a session just joined the room.
--
-- @'JoinEvent' session@
| NetworkEvent T.Text T.Text
-- ^ A 'NetworkEvent' indicates some server-side event
-- that impacts the presence of sessions in a room.
--
-- If the network event type is "partition",
-- then this should be treated as a 'PartEvent' for all sessions
-- connected to the same server id/era combo.
--
-- @'NetworkEvent' server_id server_era@
| NickEvent E.Nick E.Nick
-- ^ A 'NickEvent' announces a nick change by another session in the room.
--
-- @'NickEvent' from to@
| EditMessageEvent E.Message
-- ^ An 'EditMessageEvent' indicates that a message in the room
-- has been modified or deleted.
-- If the client offers a user interface and the indicated message
-- is currently displayed, it should update its display accordingly.
--
-- The event packet includes a snapshot of the message post-edit.
--
-- @'EditMessageEvent' editedMessage@
| PartEvent E.SessionView
-- ^ A 'PartEvent' indicates a session just disconnected from the room.
--
-- @'PartEvent' session@
| PingEvent UTCTime UTCTime
-- ^ A 'PingEvent' represents a server-to-client ping.
-- The client should send back a 'pingReply' with the same value
-- for the time field as soon as possible (or risk disconnection).
--
-- @'PingEvent' time next@
| SendEvent E.Message
-- ^ A 'SendEvent' indicates a message received by the room
-- from another session.
--
-- @'SendEvent' message@
| SnapshotEvent T.Text [E.SessionView] [E.Message] (Maybe E.Nick)
-- ^ A 'SnapshotEvent' indicates that a session has
-- successfully joined a room.
-- It also offers a snapshot of the rooms state and recent history.
--
-- @'SnapshotEvent' version listing log (Maybe nick)@
-- LoginEvent -- not implemented
-- LogoutEvent -- not implemented
-- PMInitiateEvent -- not implemented
instance FromJSON EuphEvent where
parseJSON = withObject "Event" $ \o -> do
tp <- o .: "type"
dt <- o .: "data"
empty
<|> (tp `is` "bounce-event" >> pBounceEvent dt)
<|> (tp `is` "disconnect-event" >> pDisconnectEvent dt)
<|> (tp `is` "hello-event" >> pHelloEvent dt)
<|> (tp `is` "join-event" >> pJoinEvent dt)
<|> (tp `is` "network-event" >> pNetworkEvent dt)
<|> (tp `is` "nick-event" >> pNickEvent dt)
<|> (tp `is` "edit-message-event" >> pEditMessageEvent dt)
<|> (tp `is` "part-event" >> pPartEvent dt)
<|> (tp `is` "ping-event" >> pPingEvent dt)
<|> (tp `is` "send-event" >> pSendEvent dt)
<|> (tp `is` "snapshot-event" >> pSnapshotEvent dt)
where
a `is` b = guard ((a :: T.Text) == b)
pBounceEvent = withObject "BounceEvent" $ \o ->
BounceEvent <$> o .:? "reason" <*> o .:? "auth_options"
pDisconnectEvent = withObject "DisconnectEvent" $ \o ->
DisconnectEvent <$> o .: "reason"
pHelloEvent = withObject "HelloEvent" $ \o ->
HelloEvent <$> o .: "session" <*> o .: "room_is_private" <*> o .: "version"
pJoinEvent v = JoinEvent <$> parseJSON v
pNetworkEvent = withObject "NetworkEvent" $ \o ->
NetworkEvent <$> o .: "server_id" <*> o .: "server_era"
pNickEvent = withObject "NickEvent" $ \o ->
NickEvent <$> o .: "from" <*> o .: "to"
pEditMessageEvent v = EditMessageEvent <$> parseJSON v
pPartEvent v = PartEvent <$> parseJSON v
pPingEvent = withObject "PingEvent" $ \o ->
PingEvent <$> o .: "time" <*> o .: "next"
pSendEvent v = SendEvent <$> parseJSON v
pSnapshotEvent = withObject "SnapshotEvent" $ \o ->
SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick"
{-
- Channels
-}
type RecvChan = E.CloseableChan Recv
data Recv = RDisconnected
| RPacket BS.ByteString
| RReply PacketID ReplyMVar
type SendChan = E.CloseableChan Send
data Send = SDisconnect
| forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents
| forall p . (ToJSON p) => SReply T.Text p ReplyMVar
type EventChan = Chan Event
type Event = Maybe EuphEvent
{-
- Fetch thread
-}
fetchThread :: RecvChan -> WS.Connection -> IO ()
fetchThread cRecv con = handle handleException $ forever $ do
message <- WS.receiveData con
void $ E.writeChan cRecv (RPacket message) -- will never be closed while thread running
where
handleException (WS.CloseRequest _ _) = void $ E.writeChan cRecv RDisconnected
handleException WS.ConnectionClosed = void $ E.writeChan cRecv RDisconnected
handleException _ = fetchThread cRecv con
{-
- Send thread
-}
-- Prepare a single packet for sending
preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID)
preparePacket packetType packetData = do
packetNr <- get
put $ packetNr + 1
let packetID = T.pack $ show packetNr
bytestr = encode . Object . HM.fromList $
[ ("id", A.String packetID)
, ("type", A.String packetType)
, ("data", toJSON packetData)
]
return (bytestr, packetID)
sendThread :: SendChan -> RecvChan -> WS.Connection -> StateT Integer IO ()
sendThread cSend cRecv con = do
item <- liftIO $ E.readChan cSend
case item of
Nothing ->
return ()
Just SDisconnect ->
liftIO $ WS.sendClose con ("Bye." :: T.Text)
Just (SNoReply packetType packetData) -> do
(packet, _) <- preparePacket packetType packetData
liftIO $ WS.sendTextData con packet
continue <- liftIO $ sendSafely packet
when continue $
sendThread cSend cRecv con
Just (SReply packetType packetData reply) -> do
(packet, packetID) <- preparePacket packetType packetData
void $ liftIO $ E.writeChan cRecv (RReply packetID reply)
continue <- liftIO $ sendSafely packet
when continue $
sendThread cSend cRecv con
where
sendSafely packet = (WS.sendTextData con packet >> return True) `catch` handleException
handleException (WS.CloseRequest _ _) = return False
handleException WS.ConnectionClosed = return False
handleException _ = return True
{-
- RecvThread
-}
data PacketInfo = PacketInfo
{ infoPacketID :: Maybe PacketID
, infoServerError :: Maybe T.Text
} deriving (Show)
instance FromJSON PacketInfo where
parseJSON = withObject "packet" $ \o -> do
infoPacketID <- o .:? "id"
infoServerError <- o .:? "error"
return PacketInfo{..}
-- TODO: Swap for HashMap?
type Awaiting = M.Map T.Text ReplyMVar
processRecv :: RecvChan -> EventChan -> Awaiting -> IO Awaiting
processRecv cRecv cEvent replies = do
recv <- E.readChan cRecv
case recv of
Just (RReply packetID replyMVar) -> do
let newReplies = M.insert packetID replyMVar replies
processRecv cRecv cEvent newReplies
Just (RPacket bs) -> do
newReplies <- processPacket cEvent bs replies
processRecv cRecv cEvent newReplies
_ -> return replies
processPacket :: EventChan -> BS.ByteString -> Awaiting -> IO Awaiting
processPacket cEvent bs replies = do
-- First, deal with event channel events.
case A.decode bs of
Nothing -> return ()
Just event -> writeChan cEvent (Just event)
-- Then, deal with replies.
fromMaybe (return replies) $ do
PacketInfo{..} <- A.decode bs
replyID <- infoPacketID
(ReplyMVar var) <- M.lookup replyID replies
let newReplies = M.delete replyID replies
case infoServerError of
Nothing -> do
reply <- A.decode bs
return $ newReplies <$ putMVar var (Right reply)
Just e ->
return $ newReplies <$ putMVar var (Left (EuphServerError e))
cleanupWaiting :: Awaiting -> IO ()
cleanupWaiting replies =
forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected)
cleanupSend :: SendChan -> IO ()
cleanupSend cSend = do
sends <- E.emptyChan cSend
forM_ sends $ \case
SReply _ _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected)
_ -> return ()
cleanupRecv :: RecvChan -> IO ()
cleanupRecv cRecv = do
recvs <- E.emptyChan cRecv
forM_ recvs $ \case
RReply _ (ReplyMVar var) -> putMVar var (Left EuphDisconnected)
_ -> return ()
recvThread :: SendChan -> RecvChan -> EventChan -> WS.Connection -> IO ()
recvThread cSend cRecv cEvent con = do
tFetch <- async $ fetchThread cRecv con
tSend <- async $ evalStateT (sendThread cSend cRecv con) 0
waitingReplies <- processRecv cRecv cEvent M.empty
E.closeChan cSend
wait tFetch
wait tSend
cleanupWaiting waitingReplies
cleanupSend cSend
cleanupRecv cRecv
writeChan cEvent Nothing
{-
- Startup
-}
-- TODO: Add proper documentation
data EuphConnection = EuphConnection SendChan EventChan
-- TODO: Add proper documentation
getEvents :: EuphConnection -> IO [EuphEvent]
getEvents (EuphConnection _ cEvent) = do
events <- getChanContents cEvent
return $ catMaybes $ takeWhile isJust events
-- TODO: Add proper documentation
euphClient :: WS.ClientApp EuphConnection
euphClient con = do
sendChan <- E.newOpenChan
recvChan <- E.newOpenChan
eventChan <- newChan
void $ forkIO $ recvThread sendChan recvChan eventChan con
return $ EuphConnection sendChan eventChan