Clean up implementation
This commit is contained in:
parent
1908a050c9
commit
b7892bd139
2 changed files with 67 additions and 62 deletions
|
|
@ -21,7 +21,6 @@ description: Please see the README on GitHub at <https://github.com/Garm
|
||||||
dependencies:
|
dependencies:
|
||||||
- base >= 4.7 && < 5
|
- base >= 4.7 && < 5
|
||||||
- aeson
|
- aeson
|
||||||
- bytestring
|
|
||||||
- containers
|
- containers
|
||||||
- network
|
- network
|
||||||
- stm
|
- stm
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,7 @@ module Haboli.Euphoria.Client
|
||||||
, runClient
|
, runClient
|
||||||
, ConnectionDetails(..)
|
, ConnectionDetails(..)
|
||||||
, defaultDetails
|
, defaultDetails
|
||||||
-- ** Getters
|
, getConnectionDetails
|
||||||
, getHost
|
|
||||||
, getPort
|
|
||||||
, getPath
|
|
||||||
-- ** Event handling
|
-- ** Event handling
|
||||||
, Event(..)
|
, Event(..)
|
||||||
, nextEvent
|
, nextEvent
|
||||||
|
|
@ -48,7 +45,6 @@ import Control.Monad.Trans.Except
|
||||||
import Control.Monad.Trans.Reader
|
import Control.Monad.Trans.Reader
|
||||||
import Data.Aeson
|
import Data.Aeson
|
||||||
import Data.Aeson.Types
|
import Data.Aeson.Types
|
||||||
import qualified Data.ByteString.Lazy as BS
|
|
||||||
import Data.Foldable
|
import Data.Foldable
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
|
|
@ -87,68 +83,86 @@ newtype Client e a = Client (ExceptT (ClientException e)
|
||||||
|
|
||||||
{- The websocket listening thread -}
|
{- The websocket listening thread -}
|
||||||
|
|
||||||
--TODO: This could close the ws connection and stop the client instead
|
-- | Close a 'WS.Connection', catching and ignoring any
|
||||||
-- | An exception handler that ignores messages that could not be decoded
|
-- 'WS.ConnectionException's in the process.
|
||||||
-- properly. It only prints the exceptions via 'putStrLn'.
|
safelyCloseConnection :: WS.Connection -> IO ()
|
||||||
ignoringInvalidMessages :: WS.ConnectionException -> IO ()
|
safelyCloseConnection connection =
|
||||||
ignoringInvalidMessages (WS.ParseException message) = putStrLn $ "ParseException: " ++ message
|
Control.Exception.handle ignoreAllExceptions $
|
||||||
ignoringInvalidMessages (WS.UnicodeException message) = putStrLn $ "UnicodeException: " ++ message
|
WS.sendClose connection $ T.pack "Goodbye :D"
|
||||||
ignoringInvalidMessages e = throwIO e
|
where
|
||||||
|
ignoreAllExceptions :: WS.ConnectionException -> IO ()
|
||||||
|
ignoreAllExceptions _ = pure ()
|
||||||
|
|
||||||
|
-- | An exception handler that closes the 'WS.Connection' when it receives an
|
||||||
|
-- invalidly formatted message from the server.
|
||||||
|
closeConnectionOnInvalidMessage :: WS.Connection -> WS.ConnectionException -> IO ()
|
||||||
|
closeConnectionOnInvalidMessage connection (WS.ParseException _) =
|
||||||
|
safelyCloseConnection connection
|
||||||
|
closeConnectionOnInvalidMessage connection (WS.UnicodeException _) =
|
||||||
|
safelyCloseConnection connection
|
||||||
|
closeConnectionOnInvalidMessage _ e = throwIO e
|
||||||
|
|
||||||
-- | An exception handler that stops the client if any sort of
|
-- | An exception handler that stops the client if any sort of
|
||||||
-- 'WS.ConnectionException' occurs. It does this by setting 'ciStopped' to True
|
-- 'WS.ConnectionException' occurs. It does this by setting 'ciStopped' to True
|
||||||
-- and cancelling all 'AwaitingReply'-s in 'ciAwaiting'.
|
-- and cancelling all 'AwaitingReply'-s in 'ciAwaiting'.
|
||||||
cancellingAllReplies :: ClientInfo e -> WS.ConnectionException -> IO ()
|
cancelAllReplies :: ClientInfo e -> WS.ConnectionException -> IO ()
|
||||||
cancellingAllReplies info _ = atomically $ do
|
cancelAllReplies info _ = atomically $ do
|
||||||
writeTVar (ciStopped info) True
|
writeTVar (ciStopped info) True
|
||||||
-- Cancel all replies
|
-- Cancel all replies
|
||||||
replyMap <- readTVar (ciAwaiting info)
|
replyMap <- readTVar (ciAwaiting info)
|
||||||
for_ replyMap $ \(AwaitingReply v) ->
|
for_ replyMap $ \(AwaitingReply v) ->
|
||||||
putTMVar v $ emptyReply $ Left StoppedException
|
putTMVar v $ emptyReply $ Left StoppedException
|
||||||
|
|
||||||
parseAndSendEvent :: BS.ByteString -> TChan Event -> IO ()
|
parseAndSendEvent :: Value -> TChan Event -> IO ()
|
||||||
parseAndSendEvent msg eventChan =
|
parseAndSendEvent v eventChan =
|
||||||
for_ (decode msg) $ \event ->
|
for_ (fromJSON v) $ \event ->
|
||||||
atomically $ writeTChan eventChan event
|
atomically $ writeTChan eventChan event
|
||||||
|
|
||||||
parseAndSendReply :: BS.ByteString -> TVar (AwaitingReplies e) -> IO ()
|
parseAndSendReply :: Value -> TVar (AwaitingReplies e) -> IO ()
|
||||||
parseAndSendReply msg awaiting = do
|
parseAndSendReply v awaiting = do
|
||||||
let maybePacketId = parseMaybe parsePacketId =<< decode msg
|
-- Since the client is stopped when the websocket thread finishes, and this
|
||||||
|
-- function is called inside the websocket thread, from the point of view of
|
||||||
|
-- this function, the client is never stopped. Because of that, we don't have
|
||||||
|
-- to check 'ciStopped' because we know the client isn't stopped.
|
||||||
|
let maybePacketId = parseMaybe (parseJSON >=> (.: "id")) v
|
||||||
for_ maybePacketId $ \packetId -> atomically $ do
|
for_ maybePacketId $ \packetId -> atomically $ do
|
||||||
awaitingMap <- readTVar awaiting
|
awaitingMap <- readTVar awaiting
|
||||||
for_ (awaitingMap Map.!? packetId) $ \(AwaitingReply replyVar) -> do
|
for_ (awaitingMap Map.!? packetId) $ \(AwaitingReply replyVar) -> do
|
||||||
putTMVar replyVar $ fromMaybe invalidStructureException $ decode msg
|
putTMVar replyVar $ fromMaybe invalidStructureException $ parseMaybe parseJSON v
|
||||||
modifyTVar awaiting $ Map.delete packetId
|
modifyTVar awaiting $ Map.delete packetId
|
||||||
where
|
where
|
||||||
invalidStructureException :: Reply e r
|
invalidStructureException :: Reply e r
|
||||||
invalidStructureException =
|
invalidStructureException =
|
||||||
emptyReply $ Left $ DecodeException "invalid message json structure"
|
emptyReply $ Left $ DecodeException "invalid message json structure"
|
||||||
|
|
||||||
parsePacketId :: Value -> Parser T.Text
|
|
||||||
parsePacketId (Object o) = o .: "id"
|
|
||||||
parsePacketId v = typeMismatch "Object" v
|
|
||||||
|
|
||||||
--TODO: Decode to 'Value' only once. After that, just apply the parsers.
|
|
||||||
runWebsocketThread :: ClientInfo e -> IO ()
|
runWebsocketThread :: ClientInfo e -> IO ()
|
||||||
runWebsocketThread info
|
runWebsocketThread info =
|
||||||
= WS.withPingThread (ciConnection info) pingInterval (pure ())
|
WS.withPingThread connection pingInterval (pure ()) $
|
||||||
$ Control.Exception.handle (cancellingAllReplies info) $ forever
|
-- Stop the client and cancel all replies before this thread finishes
|
||||||
$ Control.Exception.handle ignoringInvalidMessages $ do
|
Control.Exception.handle (cancelAllReplies info) $
|
||||||
msg <- WS.receiveData (ciConnection info)
|
forever $
|
||||||
print msg
|
-- If the client receives an invalidly formatted message, be careful and just
|
||||||
parseAndSendEvent msg (ciEventChan info)
|
-- disconnect because something went really wrong
|
||||||
parseAndSendReply msg (ciAwaiting info)
|
Control.Exception.handle (closeConnectionOnInvalidMessage connection) $ do
|
||||||
|
msg <- WS.receiveData connection
|
||||||
|
case decode msg of
|
||||||
|
-- If the client receives invalid JSON, also disconnect for the same reason
|
||||||
|
-- as above
|
||||||
|
Nothing -> safelyCloseConnection connection
|
||||||
|
Just value -> do
|
||||||
|
parseAndSendEvent value (ciEventChan info)
|
||||||
|
parseAndSendReply value (ciAwaiting info)
|
||||||
where
|
where
|
||||||
|
connection = ciConnection info
|
||||||
pingInterval = cdPingInterval $ ciDetails info
|
pingInterval = cdPingInterval $ ciDetails info
|
||||||
|
|
||||||
{- Running the Client monad -}
|
{- Running the Client monad -}
|
||||||
|
|
||||||
data ConnectionDetails = ConnectionDetails
|
data ConnectionDetails = ConnectionDetails
|
||||||
{ cdHost :: HostName
|
{ cdHost :: HostName
|
||||||
, cdPort :: PortNumber
|
, cdPort :: PortNumber
|
||||||
, cdPath :: String
|
, cdPath :: String
|
||||||
, cdPingInterval :: Int
|
, cdPingInterval :: Int
|
||||||
, cdThrottleDelay :: Float -- in seconds
|
|
||||||
} deriving (Show)
|
} deriving (Show)
|
||||||
|
|
||||||
defaultDetails :: ConnectionDetails
|
defaultDetails :: ConnectionDetails
|
||||||
|
|
@ -157,14 +171,12 @@ defaultDetails = ConnectionDetails
|
||||||
, cdPort = 443
|
, cdPort = 443
|
||||||
, cdPath = "/room/test/ws"
|
, cdPath = "/room/test/ws"
|
||||||
, cdPingInterval = 10
|
, cdPingInterval = 10
|
||||||
, cdThrottleDelay = 1.0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runClient :: ConnectionDetails -> Client e a -> IO (Either (ClientException e) a)
|
runClient :: ConnectionDetails -> Client e a -> IO (Either (ClientException e) a)
|
||||||
runClient details (Client stack)
|
runClient details (Client stack) =
|
||||||
= withSocketsDo
|
withSocketsDo $
|
||||||
$ WSS.runSecureClient (cdHost details) (cdPort details) (cdPath details)
|
WSS.runSecureClient (cdHost details) (cdPort details) (cdPath details) $ \connection -> do
|
||||||
$ \connection -> do
|
|
||||||
awaiting <- newTVarIO Map.empty
|
awaiting <- newTVarIO Map.empty
|
||||||
eventChan <- newTChanIO
|
eventChan <- newTChanIO
|
||||||
packetId <- newTVarIO 0
|
packetId <- newTVarIO 0
|
||||||
|
|
@ -179,32 +191,22 @@ runClient details (Client stack)
|
||||||
}
|
}
|
||||||
-- Start the websocket thread, which will notify this thread when it stops
|
-- Start the websocket thread, which will notify this thread when it stops
|
||||||
wsThreadFinished <- newEmptyMVar
|
wsThreadFinished <- newEmptyMVar
|
||||||
void $ forkFinally (runWebsocketThread info) (\_ -> putMVar wsThreadFinished ())
|
void $
|
||||||
|
forkFinally (runWebsocketThread info) (\_ -> putMVar wsThreadFinished ())
|
||||||
-- Run the actual 'Client' in this thread
|
-- Run the actual 'Client' in this thread
|
||||||
result <- runReaderT (runExceptT stack) info
|
result <- runReaderT (runExceptT stack) info
|
||||||
-- Close the connection if it is not already closed, and wait until the
|
-- Close the connection and wait until the websocket thread stops
|
||||||
-- websocket thread stops
|
safelyCloseConnection connection
|
||||||
Control.Exception.handle ignoreAllExceptions
|
|
||||||
$ WS.sendClose connection $ T.pack "Goodbye :D"
|
|
||||||
takeMVar wsThreadFinished
|
takeMVar wsThreadFinished
|
||||||
pure result
|
pure result
|
||||||
where
|
|
||||||
ignoreAllExceptions :: WS.ConnectionException -> IO ()
|
|
||||||
ignoreAllExceptions _ = pure ()
|
|
||||||
|
|
||||||
{- Getters -}
|
{- Getters -}
|
||||||
|
|
||||||
getClientInfo :: Client e (ClientInfo e)
|
getClientInfo :: Client e (ClientInfo e)
|
||||||
getClientInfo = Client $ lift ask
|
getClientInfo = Client $ lift ask
|
||||||
|
|
||||||
getHost :: Client e HostName
|
getConnectionDetails :: Client e ConnectionDetails
|
||||||
getHost = cdHost . ciDetails <$> getClientInfo
|
getConnectionDetails = ciDetails <$> getClientInfo
|
||||||
|
|
||||||
getPort :: Client e PortNumber
|
|
||||||
getPort = cdPort . ciDetails <$> getClientInfo
|
|
||||||
|
|
||||||
getPath :: Client e String
|
|
||||||
getPath = cdPath . ciDetails <$> getClientInfo
|
|
||||||
|
|
||||||
{- Event handling -}
|
{- Event handling -}
|
||||||
|
|
||||||
|
|
@ -235,6 +237,8 @@ instance FromJSON Event where
|
||||||
, EventSnapshot <$> parseJSON v
|
, EventSnapshot <$> parseJSON v
|
||||||
]
|
]
|
||||||
|
|
||||||
|
--TODO: Check if this would block infinitely if the client is stopped while this
|
||||||
|
-- waits for an event
|
||||||
nextEvent :: Client e Event
|
nextEvent :: Client e Event
|
||||||
nextEvent = do
|
nextEvent = do
|
||||||
info <- getClientInfo
|
info <- getClientInfo
|
||||||
|
|
@ -277,6 +281,8 @@ throw :: e -> Client e a
|
||||||
throw = throwRaw . CustomException
|
throw = throwRaw . CustomException
|
||||||
|
|
||||||
catch :: Client e a -> (ClientException e -> Client e a) -> Client e a
|
catch :: Client e a -> (ClientException e -> Client e a) -> Client e a
|
||||||
|
-- The main reason why the 'ExceptT' is wrapped around the 'ReaderT' in the
|
||||||
|
-- 'Client' monad is that it makes this function easier to implement
|
||||||
catch c f = Client $ catchE (unclient c) (unclient . f)
|
catch c f = Client $ catchE (unclient c) (unclient . f)
|
||||||
where
|
where
|
||||||
unclient (Client m) = m
|
unclient (Client m) = m
|
||||||
|
|
@ -310,7 +316,7 @@ wait (Thread waitVar) = do
|
||||||
-- | A server's reply to a command.
|
-- | A server's reply to a command.
|
||||||
data Reply e r = Reply
|
data Reply e r = Reply
|
||||||
{ replyThrottled :: Maybe T.Text
|
{ replyThrottled :: Maybe T.Text
|
||||||
, replyResult :: Either (ClientException e) r
|
, replyResult :: Either (ClientException e) r
|
||||||
}
|
}
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue