Comtinue implementing euphClient
This commit is contained in:
parent
c06102fc47
commit
43cbb74abf
2 changed files with 113 additions and 85 deletions
|
|
@ -22,10 +22,12 @@ description: Please see the README on Github at <https://github.com/Garm
|
|||
dependencies:
|
||||
- base >= 4.7 && < 5
|
||||
# basic stuff
|
||||
- time
|
||||
- containers
|
||||
- text
|
||||
- time
|
||||
- transformers
|
||||
# websocket connection
|
||||
- async
|
||||
- websockets
|
||||
- wuss
|
||||
# parsing json
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
-- s: sendThread
|
||||
--
|
||||
-- On creation:
|
||||
-- m: Create WS connection
|
||||
-- m: Create WS connection (or do this in r?)
|
||||
-- m: Create channels
|
||||
-- m: Start recvThread with all necessary info
|
||||
-- r: Start fetchThread and sendThread using async
|
||||
|
|
@ -59,12 +59,14 @@
|
|||
|
||||
module EuphApi.Threads (
|
||||
-- * Events and replies
|
||||
Failure(..)
|
||||
EuphException(..)
|
||||
, Event(..)
|
||||
, EuphEvent(..)
|
||||
-- * API functions
|
||||
, pingReply
|
||||
, nick
|
||||
-- * Connection to euphoria
|
||||
, euphClient
|
||||
) where
|
||||
|
||||
import Control.Applicative
|
||||
|
|
@ -72,43 +74,51 @@ import Control.Concurrent
|
|||
import Control.Exception
|
||||
import Control.Monad
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Monad.Trans.State
|
||||
import Data.Aeson as A
|
||||
import qualified Data.ByteString.Lazy as BS
|
||||
import qualified Data.HashMap.Strict as HM
|
||||
import qualified Data.Text as T
|
||||
import Data.Aeson as A
|
||||
import qualified Data.ByteString.Lazy as BS
|
||||
import qualified Data.HashMap.Strict as HM
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Text as T
|
||||
import Data.Time
|
||||
import Data.Time.Clock.POSIX
|
||||
import qualified EuphApi.CloseableChan as E
|
||||
import qualified EuphApi.Types as E
|
||||
import qualified Network.WebSockets as WS
|
||||
import qualified Network.WebSockets as WS
|
||||
|
||||
import qualified EuphApi.CloseableChan as E
|
||||
import qualified EuphApi.Types as E
|
||||
|
||||
-- Some useful type aliases
|
||||
type PacketID = T.Text
|
||||
type Reply = Either Failure
|
||||
type Reply = Either EuphException
|
||||
data ReplyMVar = forall r . (FromJSON r) => ReplyMVar (MVar (Reply r))
|
||||
|
||||
-- | The ways in which getting a reply from the server can fail.
|
||||
data Failure = FailClosed -- ^ Could not send message because connection was closed.
|
||||
| FailDisconnect -- ^ Disconnected from server while waiting for the reply.
|
||||
| FailError T.Text -- ^ The server replied with an error.
|
||||
| FailParse -- ^ Could not parse the server's reply correctly.
|
||||
data EuphException = EuphClosed
|
||||
-- ^ Could not send message because connection was closed.
|
||||
| EuphDisconnected
|
||||
-- ^ Disconnected from server while waiting for the reply.
|
||||
| EuphServerError T.Text
|
||||
-- ^ The server replied with an error.
|
||||
| EuphParse
|
||||
-- ^ Could not parse the server's reply correctly.
|
||||
|
||||
instance Show Failure where
|
||||
show FailClosed = "Connection already closed"
|
||||
show FailDisconnect = "Disconnected from server"
|
||||
show (FailError t) = "Server error: " ++ T.unpack t
|
||||
show FailParse = "Parsing failed"
|
||||
instance Show EuphException where
|
||||
show EuphClosed = "Connection already closed"
|
||||
show EuphDisconnected = "Disconnected from server"
|
||||
show (EuphServerError t) = "Server error: " ++ T.unpack t
|
||||
show EuphParse = "Parsing failed"
|
||||
|
||||
instance Exception Failure
|
||||
instance Exception EuphException
|
||||
|
||||
sendPacket :: (ToJSON p, FromJSON r) => SendChan -> T.Text -> p -> IO r
|
||||
sendPacket chan packetType packetData = do
|
||||
var <- newEmptyMVar
|
||||
let packet = SReply packetType packetData var
|
||||
let packet = SReply packetType packetData (ReplyMVar var)
|
||||
done <- E.writeChan chan packet
|
||||
case done of
|
||||
Nothing -> throw FailClosed
|
||||
Nothing -> throw EuphClosed
|
||||
Just () -> do
|
||||
result <- readMVar var
|
||||
case result of
|
||||
|
|
@ -120,7 +130,7 @@ sendPacketNoReply chan packetType packetData = do
|
|||
let packet = SNoReply packetType packetData
|
||||
done <- E.writeChan chan packet
|
||||
case done of
|
||||
Nothing -> throw FailClosed
|
||||
Nothing -> throw EuphClosed
|
||||
Just () -> return ()
|
||||
|
||||
{-
|
||||
|
|
@ -144,7 +154,7 @@ nick chan name = do
|
|||
|
||||
(.?=) :: (ToJSON v, KeyValue kv) => T.Text -> Maybe v -> [kv]
|
||||
k .?= (Just v) = [k .= v]
|
||||
k .?= Nothing = []
|
||||
_ .?= Nothing = []
|
||||
|
||||
-- ping reply/command/whatever
|
||||
|
||||
|
|
@ -314,69 +324,21 @@ instance FromJSON EuphEvent where
|
|||
pSnapshotEvent = withObject "SnapshotEvent" $ \o ->
|
||||
SnapshotEvent <$> o .: "version" <*> o .: "listing" <*> o .: "log" <*> o .:? "nick"
|
||||
|
||||
{-
|
||||
pingReply chan time = do
|
||||
let obj = object $ ["time" .= utcTimeToPOSIXSeconds time]
|
||||
packet = packetOfType "ping-reply" obj
|
||||
sent <- liftIO $ E.writeChan chan $ SNoReply packet
|
||||
case sent of
|
||||
Nothing -> return $ Left FailClosed
|
||||
Just _ -> return $ Right ()
|
||||
|
||||
nick :: SendChan -> T.Text -> IO (Reply (T.Text, T.Text))
|
||||
nick chan newNick = do
|
||||
let obj = object $ ["name" .= newNick]
|
||||
packet = packetOfType "nick" obj
|
||||
var <- liftIO newEmptyMVar
|
||||
sent <- liftIO $ E.writeChan chan $ SReply packet var
|
||||
case sent of
|
||||
Nothing -> return $ Left FailClosed
|
||||
Just _ -> do
|
||||
reply <- readMVar var
|
||||
case reply of
|
||||
Left f -> return $ Left f
|
||||
Right NickReply{..} -> return $ Right (nickReplyFrom, nickReplyTo)
|
||||
|
||||
send :: SendChan -> T.Text -> IO (Reply E.Message)
|
||||
send chan content = do
|
||||
let obj = object $ ["content" .= content]
|
||||
packet = packetOfType "send" obj
|
||||
var <- liftIO newEmptyMVar
|
||||
sent <- liftIO $ E.writeChan chan $ SReply packet var
|
||||
case sent of
|
||||
Nothing -> return $ Left FailClosed
|
||||
Just _ -> do
|
||||
reply <- readMVar var
|
||||
return $ sendReplyMessage <$> reply
|
||||
|
||||
reply :: SendChan -> PacketID -> T.Text -> IO (Reply E.Message)
|
||||
reply chan parent content = do
|
||||
let obj = object $ ["content" .= content, "parent" .= parent]
|
||||
packet = packetOfType "send" obj
|
||||
var <- liftIO newEmptyMVar
|
||||
sent <- liftIO $ E.writeChan chan $ SReply packet var
|
||||
case sent of
|
||||
Nothing -> return $ Left FailClosed
|
||||
Just _ -> do
|
||||
reply <- readMVar var
|
||||
return $ sendReplyMessage <$> reply
|
||||
-}
|
||||
|
||||
{-
|
||||
- Channels
|
||||
-}
|
||||
|
||||
type RecvChan = E.CloseableChan Recv
|
||||
type RecvChan = Chan Recv
|
||||
data Recv = RDisconnected
|
||||
| RPacket BS.ByteString
|
||||
| forall a . (FromJSON a) => RReply PacketID (MVar (Reply a))
|
||||
| RReply PacketID ReplyMVar
|
||||
|
||||
type SendChan = E.CloseableChan Send
|
||||
data Send = SDisconnect
|
||||
| forall p . (ToJSON p) => SNoReply T.Text p -- packet type and contents
|
||||
| forall p r . (ToJSON p, FromJSON r) => SReply T.Text p (MVar (Reply r))
|
||||
| forall p . (ToJSON p) => SReply T.Text p ReplyMVar
|
||||
|
||||
type EventChan e = E.CloseableChan (Event e)
|
||||
type EventChan e = Chan (Event e)
|
||||
data Event e = EDisconnected
|
||||
| EStopped
|
||||
| EEuphEvent EuphEvent
|
||||
|
|
@ -389,10 +351,10 @@ data Event e = EDisconnected
|
|||
fetchThread :: RecvChan -> WS.Connection -> IO ()
|
||||
fetchThread cRecv con = handle handleException $ forever $ do
|
||||
message <- WS.receiveData con
|
||||
void $ E.writeChan cRecv (RPacket message) -- will never be closed while thread running
|
||||
void $ writeChan cRecv (RPacket message) -- will never be closed while thread running
|
||||
where
|
||||
handleException (WS.CloseRequest _ _) = void $ E.writeChan cRecv RDisconnected
|
||||
handleException WS.ConnectionClosed = void $ E.writeChan cRecv RDisconnected
|
||||
handleException (WS.CloseRequest _ _) = void $ writeChan cRecv RDisconnected
|
||||
handleException WS.ConnectionClosed = void $ writeChan cRecv RDisconnected
|
||||
handleException _ = fetchThread cRecv con
|
||||
|
||||
{-
|
||||
|
|
@ -402,7 +364,7 @@ fetchThread cRecv con = handle handleException $ forever $ do
|
|||
type SendState = StateT Integer IO
|
||||
|
||||
-- Prepare a single packet for sending
|
||||
preparePacket :: (ToJSON p) => T.Text -> p -> SendState (BS.ByteString, PacketID)
|
||||
preparePacket :: (ToJSON p) => T.Text -> p -> StateT Integer IO (BS.ByteString, PacketID)
|
||||
preparePacket packetType packetData = do
|
||||
packetNr <- get
|
||||
put $ packetNr + 1
|
||||
|
|
@ -414,7 +376,7 @@ preparePacket packetType packetData = do
|
|||
]
|
||||
return (bytestr, packetID)
|
||||
|
||||
sendThread :: SendChan -> RecvChan -> WS.Connection -> SendState ()
|
||||
sendThread :: SendChan -> RecvChan -> WS.Connection -> StateT Integer IO ()
|
||||
sendThread cSend cRecv con = do
|
||||
item <- liftIO $ E.readChan cSend
|
||||
case item of
|
||||
|
|
@ -433,7 +395,7 @@ sendThread cSend cRecv con = do
|
|||
|
||||
Just (SReply packetType packetData reply) -> do
|
||||
(packet, packetID) <- preparePacket packetType packetData
|
||||
liftIO $ E.writeChan cRecv $ RReply packetID reply
|
||||
liftIO $ writeChan cRecv $ RReply packetID reply
|
||||
continue <- liftIO $ sendSafely packet
|
||||
when continue $
|
||||
sendThread cSend cRecv con
|
||||
|
|
@ -447,4 +409,68 @@ sendThread cSend cRecv con = do
|
|||
- RecvThread
|
||||
-}
|
||||
|
||||
-- TODO
|
||||
data PacketInfo = PacketInfo
|
||||
{ infoPacketID :: Maybe PacketID
|
||||
, infoServerError :: Maybe T.Text
|
||||
} deriving (Show)
|
||||
|
||||
instance FromJSON PacketInfo where
|
||||
parseJSON = withObject "packet" $ \o -> do
|
||||
infoPacketID <- o .:? "id"
|
||||
infoServerError <- o .:? "error"
|
||||
return PacketInfo{..}
|
||||
|
||||
-- Possibly unnecessary
|
||||
-- TODO: Swap for HashMap?
|
||||
newtype Awaiting = Awaiting (M.Map T.Text ReplyMVar)
|
||||
|
||||
emptyReplies :: Awaiting
|
||||
emptyReplies = Awaiting M.empty
|
||||
|
||||
processRecv :: RecvChan -> EventChan e -> Awaiting -> IO Awaiting
|
||||
processRecv cRecv cEvent a@(Awaiting replies) = do
|
||||
recv <- readChan cRecv
|
||||
case recv of
|
||||
RDisconnected ->
|
||||
return a
|
||||
|
||||
RReply packetID replyMVar -> do
|
||||
let newReplies = M.insert packetID replyMVar replies
|
||||
processRecv cRecv cEvent (Awaiting newReplies)
|
||||
|
||||
RPacket bs ->
|
||||
undefined -- TODO
|
||||
|
||||
cleanupWaiting :: Awaiting -> IO ()
|
||||
cleanupWaiting (Awaiting replies) =
|
||||
forM_ replies $ \(ReplyMVar var) -> putMVar var (Left EuphDisconnected)
|
||||
|
||||
cleanupSend :: SendChan -> IO ()
|
||||
cleanupSend cSend = undefined
|
||||
|
||||
cleanupRecv :: RecvChan -> IO ()
|
||||
cleanupRecv cRecv = undefined
|
||||
|
||||
recvThread :: SendChan -> RecvChan -> EventChan e -> WS.Connection -> IO ()
|
||||
recvThread cSend cRecv cEvent con = do
|
||||
tFetch <- async $ fetchThread cRecv con
|
||||
tSend <- async $ evalStateT (sendThread cSend cRecv con) 0
|
||||
waitingReplies <- processRecv cRecv cEvent emptyReplies
|
||||
E.closeChan cSend
|
||||
wait tFetch
|
||||
wait tSend
|
||||
cleanupWaiting waitingReplies
|
||||
cleanupSend cSend
|
||||
cleanupRecv cRecv
|
||||
|
||||
{-
|
||||
- Startup
|
||||
-}
|
||||
|
||||
euphClient :: WS.ClientApp (SendChan, EventChan e)
|
||||
euphClient con = do
|
||||
sendChan <- E.newOpenChan
|
||||
recvChan <- newChan
|
||||
eventChan <- newChan
|
||||
forkIO $ recvThread sendChan recvChan eventChan con
|
||||
return (sendChan, eventChan)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue