Prepare delaying commands when client is throttled
This commit is contained in:
parent
b56ff35ec1
commit
e578d688ab
1 changed files with 40 additions and 21 deletions
|
|
@ -100,19 +100,34 @@ data ClientException e
|
||||||
| CustomException e
|
| CustomException e
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
instance FromJSON (ClientException e) where
|
-- | A server's reply to a command.
|
||||||
parseJSON (Object o) = ServerException
|
data Reply e r = Reply
|
||||||
<$> o .: "error"
|
{ replyThrottled :: Maybe T.Text
|
||||||
|
, replyResult :: Either (ClientException e) r
|
||||||
|
}
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
|
instance FromJSON r => FromJSON (Reply e r) where
|
||||||
|
parseJSON v@(Object o) = Reply
|
||||||
|
<$> throttledParser
|
||||||
|
<*> ((Left <$> errorParser) <|> (Right <$> parseJSON v))
|
||||||
|
where
|
||||||
|
throttledParser = do
|
||||||
|
throttled <- o .:? "throttled" .!= False
|
||||||
|
if throttled
|
||||||
|
then Just <$> o .:? "throttled_reason" .!= ""
|
||||||
|
else pure Nothing
|
||||||
|
errorParser = ServerException <$> o .: "error"
|
||||||
parseJSON v = typeMismatch "Object" v
|
parseJSON v = typeMismatch "Object" v
|
||||||
|
|
||||||
-- | This type is used by the websocket thread to send the server's replies to
|
emptyReply :: Either (ClientException e) r -> Reply e r
|
||||||
-- the client. Since exceptions like a 'ServerError' may occur, they are
|
emptyReply = Reply Nothing
|
||||||
-- explicitly included in the type stored in the 'MVar'.
|
|
||||||
--
|
-- | This type represents a @'Reply' e r@ with arbitrary @r@ that has yet to be
|
||||||
-- The fancy types are there so I don't have to explicitly specify the response
|
-- received. The @forall@ allows whoever creates the 'AwaitingReply' to decide
|
||||||
-- in some sum type or similar.
|
-- on the type of @r@.
|
||||||
data AwaitingReply e
|
data AwaitingReply e
|
||||||
= forall r. FromJSON r => AwaitingReply (TMVar (Either (ClientException e) r))
|
= forall r. FromJSON r => AwaitingReply (TMVar (Reply e r))
|
||||||
|
|
||||||
-- | A 'Map.Map' of empty 'TMVar's waiting for their respective reply packet
|
-- | A 'Map.Map' of empty 'TMVar's waiting for their respective reply packet
|
||||||
-- from the server.
|
-- from the server.
|
||||||
|
|
@ -152,7 +167,7 @@ cancellingAllReplies info _ = atomically $ do
|
||||||
-- Cancel all replies
|
-- Cancel all replies
|
||||||
replyMap <- readTVar (ciAwaiting info)
|
replyMap <- readTVar (ciAwaiting info)
|
||||||
for_ replyMap $ \(AwaitingReply v) ->
|
for_ replyMap $ \(AwaitingReply v) ->
|
||||||
putTMVar v (Left StoppedException)
|
putTMVar v $ emptyReply $ Left StoppedException
|
||||||
|
|
||||||
parseAndSendEvent :: BS.ByteString -> TChan Event -> IO ()
|
parseAndSendEvent :: BS.ByteString -> TChan Event -> IO ()
|
||||||
parseAndSendEvent msg eventChan =
|
parseAndSendEvent msg eventChan =
|
||||||
|
|
@ -165,11 +180,13 @@ parseAndSendReply msg awaiting = do
|
||||||
for_ maybePacketId $ \packetId -> atomically $ do
|
for_ maybePacketId $ \packetId -> atomically $ do
|
||||||
awaitingMap <- readTVar awaiting
|
awaitingMap <- readTVar awaiting
|
||||||
for_ (awaitingMap Map.!? packetId) $ \(AwaitingReply replyVar) -> do
|
for_ (awaitingMap Map.!? packetId) $ \(AwaitingReply replyVar) -> do
|
||||||
let maybeExceptionOrReply = (Left <$> decode msg) <|> (Right <$> decode msg)
|
putTMVar replyVar $ fromMaybe invalidStructureException $ decode msg
|
||||||
invalidStructureException = Left $ DecodeException "invalid message json structure"
|
|
||||||
putTMVar replyVar $ fromMaybe invalidStructureException maybeExceptionOrReply
|
|
||||||
modifyTVar awaiting $ Map.delete packetId
|
modifyTVar awaiting $ Map.delete packetId
|
||||||
where
|
where
|
||||||
|
invalidStructureException :: Reply e r
|
||||||
|
invalidStructureException =
|
||||||
|
emptyReply $ Left $ DecodeException "invalid message json structure"
|
||||||
|
|
||||||
parsePacketId :: Value -> Parser T.Text
|
parsePacketId :: Value -> Parser T.Text
|
||||||
parsePacketId (Object o) = o .: "id"
|
parsePacketId (Object o) = o .: "id"
|
||||||
parsePacketId v = typeMismatch "Object" v
|
parsePacketId v = typeMismatch "Object" v
|
||||||
|
|
@ -193,6 +210,7 @@ data ConnectionDetails = ConnectionDetails
|
||||||
, cdPort :: PortNumber
|
, cdPort :: PortNumber
|
||||||
, cdPath :: String
|
, cdPath :: String
|
||||||
, cdPingInterval :: Int
|
, cdPingInterval :: Int
|
||||||
|
, cdThrottleDelay :: Float -- in seconds
|
||||||
} deriving (Show)
|
} deriving (Show)
|
||||||
|
|
||||||
defaultDetails :: ConnectionDetails
|
defaultDetails :: ConnectionDetails
|
||||||
|
|
@ -201,6 +219,7 @@ defaultDetails = ConnectionDetails
|
||||||
, cdPort = 443
|
, cdPort = 443
|
||||||
, cdPath = "/room/test/ws"
|
, cdPath = "/room/test/ws"
|
||||||
, cdPingInterval = 10
|
, cdPingInterval = 10
|
||||||
|
, cdThrottleDelay = 1.0
|
||||||
}
|
}
|
||||||
|
|
||||||
runClient :: ConnectionDetails -> Client e a -> IO (Either (ClientException e) a)
|
runClient :: ConnectionDetails -> Client e a -> IO (Either (ClientException e) a)
|
||||||
|
|
@ -299,7 +318,7 @@ sendPacketWithReply packet = do
|
||||||
Nothing -> throwRaw StoppedException
|
Nothing -> throwRaw StoppedException
|
||||||
Just replyVar -> do
|
Just replyVar -> do
|
||||||
answer <- liftIO $ atomically $ readTMVar replyVar
|
answer <- liftIO $ atomically $ readTMVar replyVar
|
||||||
case answer of
|
case replyResult answer of
|
||||||
Left e -> throwRaw e
|
Left e -> throwRaw e
|
||||||
Right r -> pure r
|
Right r -> pure r
|
||||||
|
|
||||||
|
|
@ -353,7 +372,7 @@ handle = flip Haboli.Euphoria.Client.catch
|
||||||
|
|
||||||
{- Threading -}
|
{- Threading -}
|
||||||
|
|
||||||
data Thread e a = Thread (MVar (Either (ClientException e) a))
|
newtype Thread e a = Thread (MVar (Either (ClientException e) a))
|
||||||
|
|
||||||
fork :: Client e a -> Client e (Thread e a)
|
fork :: Client e a -> Client e (Thread e a)
|
||||||
fork (Client f) = do
|
fork (Client f) = do
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue