diff --git a/cove-tui/src/cove/conn.rs b/cove-tui/src/cove/conn.rs index 2adc577..7b5a690 100644 --- a/cove-tui/src/cove/conn.rs +++ b/cove-tui/src/cove/conn.rs @@ -4,25 +4,32 @@ use std::time::Duration; use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx}; use cove_core::packets::{ - IdentifyRpl, JoinNtf, NickNtf, NickRpl, Ntf, Packet, PartNtf, RoomRpl, Rpl, SendNtf, SendRpl, - WhoRpl, + Cmd, IdentifyCmd, IdentifyRpl, JoinNtf, NickNtf, NickRpl, Ntf, Packet, PartNtf, RoomCmd, + RoomRpl, Rpl, SendNtf, SendRpl, WhoRpl, }; use cove_core::{Session, SessionId}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::Mutex; -use crate::replies::Replies; +use crate::replies::{self, Replies}; +// TODO Split into "interacting" and "maintenance" parts? #[derive(Debug, thiserror::Error)] pub enum Error { #[error("{0}")] Conn(#[from] conn::Error), + #[error("{0}")] + Reply(#[from] replies::Error), #[error("invalid room: {0}")] InvalidRoom(String), #[error("invalid identity: {0}")] InvalidIdentity(String), #[error("maintenance aborted")] MaintenanceAborted, + #[error("not connected")] + NotConnected, + #[error("incorrect reply type")] + IncorrectReplyType, } pub enum Event { @@ -97,6 +104,7 @@ impl Status { pub struct Connected { tx: ConnTx, + next_id: u64, replies: Replies, status: Status, } @@ -105,37 +113,114 @@ impl Connected { fn new(tx: ConnTx, timeout: Duration) -> Self { Self { tx, + next_id: 0, replies: Replies::new(timeout), status: Status::ChoosingRoom, } } } -pub enum CoveConn { +pub enum State { Connecting, Connected(Connected), Stopped, } -impl CoveConn { - fn connected(&self) -> Option<&Connected> { +impl State { + pub fn connected(&self) -> Option<&Connected> { match self { - CoveConn::Connected(connected) => Some(connected), - CoveConn::Connecting | CoveConn::Stopped => None, + Self::Connected(connected) => Some(connected), + Self::Connecting | Self::Stopped => None, } } - fn connected_mut(&mut self) -> Option<&mut Connected> { + pub fn connected_mut(&mut self) -> Option<&mut Connected> { match self { - CoveConn::Connected(connected) => Some(connected), - CoveConn::Connecting | CoveConn::Stopped => None, + Self::Connected(connected) => Some(connected), + Self::Connecting | Self::Stopped => None, } } } +pub struct CoveConn { + state: State, + ev_tx: UnboundedSender, +} + +impl CoveConn { + pub fn state(&self) -> &State { + &self.state + } + + pub fn state_mut(&mut self) -> &mut State { + &mut self.state + } + + pub fn connected(&self) -> Option<&Connected> { + self.state.connected() + } + + pub fn connected_mut(&mut self) -> Option<&mut Connected> { + self.state.connected_mut() + } + + async fn cmd(conn: &Mutex, cmd: C) -> Result + where + C: Into, + Rpl: TryInto, + { + let pending_reply = { + let mut conn = conn.lock().await; + let mut connected = conn.connected_mut().ok_or(Error::NotConnected)?; + + let id = connected.next_id; + connected.next_id += 1; + + let pending_reply = connected.replies.wait_for(id); + connected.tx.send(&Packet::cmd(id, cmd.into()))?; + pending_reply + }; + + let rpl = pending_reply.get().await?; + let rpl_value = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?; + Ok(rpl_value) + } + + /// Attempt to identify with a nick and identity. Does nothing if the room + /// doesn't require verification. + /// + /// This method is intended to be called whenever a CoveConn user suspects + /// identification to be necessary. It has little overhead. + pub async fn identify(conn: Arc>, nick: &str, identity: &str) { + { + let mut conn = conn.lock().await; + if let Some(connected) = conn.connected_mut() { + if let Status::IdRequired(_) = connected.status { + connected.status = Status::Identifying; + conn.ev_tx.send(Event::StateChanged); + } else { + return; + } + } else { + return; + } + } + + let nick = nick.to_string(); + let identity = identity.to_string(); + tokio::spawn(async move { + // There's no need for a second locking block, or for us to see the + // result of this command. CoveConnMt::run will set the connection's + // status as appropriate. + Self::cmd::(&conn, IdentifyCmd { nick, identity }).await + }); + } +} + /// Maintenance for a [`CoveConn`]. pub struct CoveConnMt { url: String, + room: String, timeout: Duration, conn: Arc>, ev_tx: UnboundedSender, @@ -146,21 +231,22 @@ impl CoveConnMt { let (tx, rx, mt) = match Self::connect(&self.url, self.timeout).await { Ok(conn) => conn, Err(e) => { - *self.conn.lock().await = CoveConn::Stopped; + *self.conn.lock().await.state_mut() = State::Stopped; + self.ev_tx.send(Event::StateChanged); return Err(Error::Conn(e)); } }; - *self.conn.lock().await = CoveConn::Connected(Connected::new(tx, self.timeout)); + *self.conn.lock().await.state_mut() = State::Connected(Connected::new(tx, self.timeout)); self.ev_tx.send(Event::StateChanged); - // TODO Spawn task to join room + tokio::spawn(Self::join_room(self.conn.clone(), self.room)); let result = tokio::select! { result = Self::recv(&self.conn, &self.ev_tx, rx) => result, _ = mt.perform() => Err(Error::MaintenanceAborted), }; - *self.conn.lock().await = CoveConn::Stopped; + *self.conn.lock().await.state_mut() = State::Stopped; self.ev_tx.send(Event::StateChanged); result @@ -175,6 +261,11 @@ impl CoveConnMt { Ok(conn) } + async fn join_room(conn: Arc>, name: String) -> Result<(), Error> { + let reply: RoomRpl = CoveConn::cmd(&conn, RoomCmd { name }).await?; + Ok(()) + } + async fn recv( conn: &Mutex, ev_tx: &UnboundedSender, @@ -206,7 +297,8 @@ impl CoveConnMt { match &rpl { Rpl::Room(RoomRpl::Success) => { - // TODO Send event that joining room was successful? + connected.status = Status::IdRequired(None); + ev_tx.send(Event::StateChanged); } Rpl::Room(RoomRpl::InvalidRoom { reason }) => { return Err(Error::InvalidRoom(reason.clone())) @@ -215,7 +307,10 @@ impl CoveConnMt { connected.status = Status::Present(Present::new(you, others)); ev_tx.send(Event::StateChanged); } - Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => {} + Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => { + connected.status = Status::IdRequired(Some(reason.clone())); + ev_tx.send(Event::StateChanged); + } Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => { return Err(Error::InvalidIdentity(reason.clone())) } @@ -284,12 +379,17 @@ impl CoveConnMt { pub async fn new( url: String, + room: String, timeout: Duration, ) -> (Arc>, CoveConnMt, UnboundedReceiver) { - let conn = Arc::new(Mutex::new(CoveConn::Connecting)); let (ev_tx, ev_rx) = mpsc::unbounded_channel(); + let conn = Arc::new(Mutex::new(CoveConn { + state: State::Connecting, + ev_tx: ev_tx.clone(), + })); let mt = CoveConnMt { url, + room, timeout, conn, ev_tx,