diff --git a/package.yaml b/package.yaml index d4a77db..252f82d 100644 --- a/package.yaml +++ b/package.yaml @@ -22,10 +22,12 @@ description: Please see the README on Github at = 4.7 && < 5 # basic stuff -- time +- containers - text +- time - transformers # websocket connection +- async - websockets - wuss # parsing json diff --git a/src/EuphApi/Threads.hs b/src/EuphApi/Threads.hs index 8ea1b5b..5fadd2d 100644 --- a/src/EuphApi/Threads.hs +++ b/src/EuphApi/Threads.hs @@ -12,7 +12,7 @@ -- s: sendThread -- -- On creation: --- m: Create WS connection +-- m: Create WS connection (or do this in r?) -- m: Create channels -- m: Start recvThread with all necessary info -- r: Start fetchThread and sendThread using async @@ -59,12 +59,14 @@ module EuphApi.Threads ( -- * Events and replies - Failure(..) + EuphException(..) , Event(..) , EuphEvent(..) -- * API functions , pingReply , nick + -- * Connection to euphoria + , euphClient ) where import Control.Applicative @@ -72,43 +74,51 @@ import Control.Concurrent import Control.Exception import Control.Monad import Control.Monad.IO.Class -import Control.Monad.Trans.Except + +import Control.Concurrent.Async import Control.Monad.Trans.State -import Data.Aeson as A -import qualified Data.ByteString.Lazy as BS -import qualified Data.HashMap.Strict as HM -import qualified Data.Text as T +import Data.Aeson as A +import qualified Data.ByteString.Lazy as BS +import qualified Data.HashMap.Strict as HM +import qualified Data.Map as M +import qualified Data.Text as T import Data.Time import Data.Time.Clock.POSIX -import qualified EuphApi.CloseableChan as E -import qualified EuphApi.Types as E -import qualified Network.WebSockets as WS +import qualified Network.WebSockets as WS + +import qualified EuphApi.CloseableChan as E +import qualified EuphApi.Types as E -- Some useful type aliases type PacketID = T.Text -type Reply = Either Failure +type Reply = Either EuphException +data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r)) -- | The ways in which getting a reply from the server can fail. -data Failure = FailClosed -- ^ Could not send message because connection was closed. - | FailDisconnect -- ^ Disconnected from server while waiting for the reply. - | FailError T.Text -- ^ The server replied with an error. - | FailParse -- ^ Could not parse the server's reply correctly. +data EuphException = EuphClosed + -- ^ Could not send message because connection was closed. + | EuphDisconnected + -- ^ Disconnected from server while waiting for the reply. + | EuphServerError T.Text + -- ^ The server replied with an error. + | EuphParse + -- ^ Could not parse the server's reply correctly. -instance Show Failure where - show FailClosed = "Connection already closed" - show FailDisconnect = "Disconnected from server" - show (FailError t) = "Server error: " ++ T.unpack t - show FailParse = "Parsing failed" +instance Show EuphException where + show EuphClosed = "Connection already closed" + show EuphDisconnected = "Disconnected from server" + show (EuphServerError t) = "Server error: " ++ T.unpack t + show EuphParse = "Parsing failed" -instance Exception Failure +instance Exception EuphException sendPacket :: (ToJSON p, FromJSON r) => SendChan -> T.Text -> p -> IO r sendPacket chan packetType packetData = do var <- newEmptyMVar - let packet = SReply packetType packetData var + let packet = SReply packetType packetData (ReplyMVar var) done <- E.writeChan chan packet case done of - Nothing -> throw FailClosed + Nothing -> throw EuphClosed Just () -> do result <- readMVar var case result of @@ -120,7 +130,7 @@ sendPacketNoReply chan packetType packetData = do let packet = SNoReply packetType packetData done <- E.writeChan chan packet case done of - Nothing -> throw FailClosed + Nothing -> throw EuphClosed Just () -> return () {- @@ -144,7 +154,7 @@ nick chan name = do (.?=) :: (ToJSON v, KeyValue kv) => T.Text -> Maybe v -> [kv] k .?= (Just v) = [k .= v] -k .?= Nothing = [] +_ .?= Nothing = [] -- ping reply/command/whatever @@ -314,69 +324,21 @@ instance FromJSON EuphEvent where pSnapshotEvent = withObject "SnapshotEvent" $ \o -> SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick" -{- -pingReply chan time = do - let obj = object $ ["time" .= utcTimeToPOSIXSeconds time] - packet = packetOfType "ping-reply" obj - sent <- liftIO $ E.writeChan chan $ SNoReply packet - case sent of - Nothing -> return $ Left FailClosed - Just _ -> return $ Right () - -nick :: SendChan -> T.Text -> IO (Reply (T.Text, T.Text)) -nick chan newNick = do - let obj = object $ ["name" .= newNick] - packet = packetOfType "nick" obj - var <- liftIO newEmptyMVar - sent <- liftIO $ E.writeChan chan $ SReply packet var - case sent of - Nothing -> return $ Left FailClosed - Just _ -> do - reply <- readMVar var - case reply of - Left f -> return $ Left f - Right NickReply{..} -> return $ Right (nickReplyFrom, nickReplyTo) - -send :: SendChan -> T.Text -> IO (Reply E.Message) -send chan content = do - let obj = object $ ["content" .= content] - packet = packetOfType "send" obj - var <- liftIO newEmptyMVar - sent <- liftIO $ E.writeChan chan $ SReply packet var - case sent of - Nothing -> return $ Left FailClosed - Just _ -> do - reply <- readMVar var - return $ sendReplyMessage <$> reply - -reply :: SendChan -> PacketID -> T.Text -> IO (Reply E.Message) -reply chan parent content = do - let obj = object $ ["content" .= content, "parent" .= parent] - packet = packetOfType "send" obj - var <- liftIO newEmptyMVar - sent <- liftIO $ E.writeChan chan $ SReply packet var - case sent of - Nothing -> return $ Left FailClosed - Just _ -> do - reply <- readMVar var - return $ sendReplyMessage <$> reply --} - {- - Channels -} -type RecvChan = E.CloseableChan Recv +type RecvChan = Chan Recv data Recv = RDisconnected | RPacket BS.ByteString - | forall a . (FromJSON a) => RReply PacketID (MVar (Reply a)) + | RReply PacketID ReplyMVar type SendChan = E.CloseableChan Send data Send = SDisconnect | forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents - | forall p r . (ToJSON p, FromJSON r) => SReply T.Text p (MVar (Reply r)) + | forall p . (ToJSON p) => SReply T.Text p ReplyMVar -type EventChan e = E.CloseableChan (Event e) +type EventChan e = Chan (Event e) data Event e = EDisconnected | EStopped | EEuphEvent EuphEvent @@ -389,10 +351,10 @@ data Event e = EDisconnected fetchThread :: RecvChan -> WS.Connection -> IO () fetchThread cRecv con = handle handleException $ forever $ do message <- WS.receiveData con - void $ E.writeChan cRecv (RPacket message) -- will never be closed while thread running + void $ writeChan cRecv (RPacket message) -- will never be closed while thread running where - handleException (WS.CloseRequest _ _) = void $ E.writeChan cRecv RDisconnected - handleException WS.ConnectionClosed = void $ E.writeChan cRecv RDisconnected + handleException (WS.CloseRequest _ _) = void $ writeChan cRecv RDisconnected + handleException WS.ConnectionClosed = void $ writeChan cRecv RDisconnected handleException _ = fetchThread cRecv con {- @@ -402,7 +364,7 @@ fetchThread cRecv con = handle handleException $ forever $ do type SendState = StateT Integer IO -- Prepare a single packet for sending -preparePacket :: (ToJSON p) => T.Text -> p -> SendState (BS.ByteString, PacketID) +preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID) preparePacket packetType packetData = do packetNr <- get put $ packetNr + 1 @@ -414,7 +376,7 @@ preparePacket packetType packetData = do ] return (bytestr, packetID) -sendThread :: SendChan -> RecvChan -> WS.Connection -> SendState () +sendThread :: SendChan -> RecvChan -> WS.Connection -> StateT Integer IO () sendThread cSend cRecv con = do item <- liftIO $ E.readChan cSend case item of @@ -433,7 +395,7 @@ sendThread cSend cRecv con = do Just (SReply packetType packetData reply) -> do (packet, packetID) <- preparePacket packetType packetData - liftIO $ E.writeChan cRecv $ RReply packetID reply + liftIO $ writeChan cRecv $ RReply packetID reply continue <- liftIO $ sendSafely packet when continue $ sendThread cSend cRecv con @@ -447,4 +409,68 @@ sendThread cSend cRecv con = do - RecvThread -} --- TODO +data PacketInfo = PacketInfo + { infoPacketID :: Maybe PacketID + , infoServerError :: Maybe T.Text + } deriving (Show) + +instance FromJSON PacketInfo where + parseJSON = withObject "packet" $ \o -> do + infoPacketID <- o .:? "id" + infoServerError <- o .:? "error" + return PacketInfo{..} + +-- Possibly unnecessary +-- TODO: Swap for HashMap? +newtype Awaiting = Awaiting (M.Map T.Text ReplyMVar) + +emptyReplies :: Awaiting +emptyReplies = Awaiting M.empty + +processRecv :: RecvChan -> EventChan e -> Awaiting -> IO Awaiting +processRecv cRecv cEvent a@(Awaiting replies) = do + recv <- readChan cRecv + case recv of + RDisconnected -> + return a + + RReply packetID replyMVar -> do + let newReplies = M.insert packetID replyMVar replies + processRecv cRecv cEvent (Awaiting newReplies) + + RPacket bs -> + undefined -- TODO + +cleanupWaiting :: Awaiting -> IO () +cleanupWaiting (Awaiting replies) = + forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected) + +cleanupSend :: SendChan -> IO () +cleanupSend cSend = undefined + +cleanupRecv :: RecvChan -> IO () +cleanupRecv cRecv = undefined + +recvThread :: SendChan -> RecvChan -> EventChan e -> WS.Connection -> IO () +recvThread cSend cRecv cEvent con = do + tFetch <- async $ fetchThread cRecv con + tSend <- async $ evalStateT (sendThread cSend cRecv con) 0 + waitingReplies <- processRecv cRecv cEvent emptyReplies + E.closeChan cSend + wait tFetch + wait tSend + cleanupWaiting waitingReplies + cleanupSend cSend + cleanupRecv cRecv + +{- + - Startup + -} + +euphClient :: WS.ClientApp (SendChan, EventChan e) +euphClient con = do + sendChan <- E.newOpenChan + recvChan <- newChan + eventChan <- newChan + forkIO $ recvThread sendChan recvChan eventChan con + return (sendChan, eventChan)