diff --git a/cove-tui/src/cove.rs b/cove-tui/src/cove.rs index 636348f..e5a005c 100644 --- a/cove-tui/src/cove.rs +++ b/cove-tui/src/cove.rs @@ -1 +1,2 @@ mod conn; +mod room; diff --git a/cove-tui/src/cove/conn.rs b/cove-tui/src/cove/conn.rs index e800d81..92f2792 100644 --- a/cove-tui/src/cove/conn.rs +++ b/cove-tui/src/cove/conn.rs @@ -125,6 +125,7 @@ impl Connected { pub enum State { Connecting, Connected(Connected), + // TODO Include reason for stop Stopped, } diff --git a/cove-tui/src/cove/room.rs b/cove-tui/src/cove/room.rs new file mode 100644 index 0000000..4141ce4 --- /dev/null +++ b/cove-tui/src/cove/room.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{self, Sender}; +use tokio::sync::Mutex; + +use crate::config::Config; +use crate::never::Never; + +use super::conn::{self, CoveConn, CoveConnMt, Event}; + +struct ConnConfig { + url: String, + room: String, + timeout: Duration, + ev_tx: UnboundedSender, +} + +impl ConnConfig { + async fn new_conn(&self) -> (CoveConn, CoveConnMt) { + conn::new( + self.url.clone(), + self.room.clone(), + self.timeout, + self.ev_tx.clone(), + ) + .await + } +} + +pub struct CoveRoom { + name: String, + conn: Arc>, + /// Once this is dropped, all other room-related tasks, connections and + /// values are cleaned up. + dead_mans_switch: Sender, +} + +impl CoveRoom { + pub async fn new( + config: &'static Config, + outer_ev_tx: UnboundedSender, + name: String, + ) -> Self + where + E: Send + 'static, + Event: Into, + { + let (ev_tx, ev_rx) = mpsc::unbounded_channel(); + let (tx, rx) = oneshot::channel(); + + let conf = ConnConfig { + ev_tx, + url: config.cove_url.to_string(), + room: name.clone(), + timeout: config.timeout, + }; + let (conn, mt) = conf.new_conn().await; + + let room = Self { + name, + conn: Arc::new(Mutex::new(conn)), + dead_mans_switch: tx, + }; + + let conn_clone = room.conn.clone(); + tokio::spawn(async move { + tokio::select! { + _ = rx => {} // Watch dead man's switch + _ = Self::shovel_events(ev_rx, outer_ev_tx) => {} + _ = Self::run(conn_clone, mt, conf) => {} + } + }); + + room + } + + async fn shovel_events(mut ev_rx: UnboundedReceiver, ev_tx: UnboundedSender) + where + Event: Into, + { + while let Some(event) = ev_rx.recv().await { + if ev_tx.send(event.into()).is_err() { + break; + } + } + } + + /// Background task to connect to a room and stay connected. + async fn run(conn: Arc>, mut mt: CoveConnMt, conf: ConnConfig) { + // We have successfully connected to the url before. Errors while + // connecting are probably not our fault and we should try again later. + let mut url_exists = false; + + loop { + match mt.run().await { + Err(conn::Error::CouldNotConnect(_)) if url_exists => { + // TODO Exponential backoff? + tokio::time::sleep(Duration::from_secs(10)).await; + } + Err(conn::Error::CouldNotConnect(_)) => return, + Err(conn::Error::InvalidRoom(_)) => return, + Err(conn::Error::InvalidIdentity(_)) => return, + _ => {} + } + + url_exists = true; + + // TODO Clean up with restructuring assignments later? + let (new_conn, new_mt) = conf.new_conn().await; + *conn.lock().await = new_conn; + mt = new_mt; + } + } +} diff --git a/cove-tui/src/main.rs b/cove-tui/src/main.rs index 0455444..291f995 100644 --- a/cove-tui/src/main.rs +++ b/cove-tui/src/main.rs @@ -1,9 +1,8 @@ mod config; +mod cove; mod never; mod replies; -mod room; mod ui; -mod cove; use std::io; diff --git a/cove-tui/src/room.rs b/cove-tui/src/room.rs deleted file mode 100644 index 7989b27..0000000 --- a/cove-tui/src/room.rs +++ /dev/null @@ -1,381 +0,0 @@ -/* -Idea: -Identification etc. runs per-connection -Put connection into another Arc> -Give reference to connection to identify thread? - -On the other hand... -UI may also do weird things when setting nick during identification -Maybe use same mechanism here? - -Also... -Maybe have a look at what an euph room would require? -Maybe start working on euph room in parallel? -*/ - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx}; -use cove_core::packets::{ - Cmd, IdentifyCmd, IdentifyRpl, NickRpl, Ntf, Packet, RoomCmd, RoomRpl, Rpl, SendRpl, WhoRpl, -}; -use cove_core::{Session, SessionId}; -use futures::io::Repeat; -use tokio::sync::oneshot::{self, Sender}; -use tokio::sync::Mutex; - -use crate::config::Config; -use crate::never::Never; -use crate::replies::{self, Replies}; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("not connected")] - NotConnected, - #[error("not present")] - NotPresent, - #[error("incorrect reply type")] - IncorrectReplyType, - #[error("{0}")] - Conn(#[from] conn::Error), - #[error("{0}")] - Replies(#[from] replies::Error), -} - -pub enum Status { - ChoosingRoom, - Identifying, - /// User must enter a nick. May contain error message about previous nick. - NickRequired(Option), -} - -pub struct Connected { - status: Status, - tx: ConnTx, - next_id: u64, - replies: Replies, -} - -impl Connected { - fn new(tx: ConnTx, timeout: Duration) -> Self { - Self { - status: Status::ChoosingRoom, - tx, - next_id: 0, - replies: Replies::new(timeout), - } - } -} - -pub enum StopReason { - CouldNotConnect(conn::Error), - InvalidRoom(String), - InvalidIdentity(String), - /// Something went wrong but we don't know what. - SomethingWentWrong, -} - -pub enum Connection { - Connecting, - Reconnecting, - Connected(Connected), - Stopped(StopReason), -} - -impl Connection { - fn connected(&self) -> Option<&Connected> { - match self { - Connection::Connected(connected) => Some(connected), - Connection::Connecting | Connection::Reconnecting | Connection::Stopped(_) => None, - } - } - - fn connected_mut(&mut self) -> Option<&mut Connected> { - match self { - Connection::Connected(connected) => Some(connected), - Connection::Connecting | Connection::Reconnecting | Connection::Stopped(_) => None, - } - } - - fn stopped(&self) -> bool { - match self { - Connection::Stopped(_) => true, - Connection::Connecting | Connection::Reconnecting | Connection::Connected(_) => false, - } - } -} - -pub struct Present { - session: Session, - others: HashMap, -} - -pub struct RoomState { - identity: String, - initial_nick: Option, - connection: Connection, - present: Option, -} - -impl RoomState { - fn modified(&self) { - // TODO Send render event to main thread - } - - fn on_rpl( - &mut self, - id: u64, - rpl: Rpl, - room_verified: &mut Option, - ) -> anyhow::Result<()> { - match &rpl { - Rpl::Room(RoomRpl::Success) => { - *room_verified = Some(RoomVerified::Yes); - } - Rpl::Room(RoomRpl::InvalidRoom { reason }) => { - self.status = Status::Stopped(StopReason::InvalidRoom(reason.clone())); - anyhow::bail!("invalid room"); - } - Rpl::Identify(IdentifyRpl::Success { - you, - others, - last_message, - }) => { - let session = you.clone(); - let others = others - .iter() - .map(|session| (session.id, session.clone())) - .collect(); - self.present = Some(Present { session, others }); - // TODO Send last message to store - } - Rpl::Identify(IdentifyRpl::InvalidNick { .. }) => {} - Rpl::Identify(IdentifyRpl::InvalidIdentity { .. }) => {} - Rpl::Nick(NickRpl::Success { you }) => { - if let Some(present) = &mut self.present { - present.session = you.clone(); - } - } - Rpl::Nick(NickRpl::InvalidNick { .. }) => {} - Rpl::Send(SendRpl::Success { message }) => { - // TODO Send message to store - } - Rpl::Send(SendRpl::InvalidContent { .. }) => {} - Rpl::Who(WhoRpl { you, others }) => { - if let Some(present) = &mut self.present { - present.session = you.clone(); - present.others = others - .iter() - .map(|session| (session.id, session.clone())) - .collect(); - } - } - } - - if let Some(connected) = &mut self.connected { - connected.replies.complete(&id, rpl); - } - - Ok(()) - } - - fn on_ntf(&mut self, ntf: Ntf) { - match ntf { - Ntf::Join(join) => { - if let Some(present) = &mut self.present { - present.others.insert(join.who.id, join.who); - } - } - Ntf::Nick(nick) => { - if let Some(present) = &mut self.present { - present.others.insert(nick.who.id, nick.who); - } - } - Ntf::Part(part) => { - if let Some(present) = &mut self.present { - present.others.remove(&part.who.id); - } - } - Ntf::Send(_) => { - // TODO Send message to store - } - } - } - - async fn cmd(state: &Mutex, cmd: C) -> Result - where - C: Into, - Rpl: TryInto, - { - let pending_reply = { - let mut state = state.lock().await; - let connected = state.connected.as_mut().ok_or(Error::NotConnected)?; - - let id = connected.next_id; - connected.next_id += 1; - - let pending_reply = connected.replies.wait_for(id); - connected.tx.send(&Packet::cmd(id, cmd.into()))?; - pending_reply - }; - - let rpl = pending_reply.get().await?; - let rpl_value = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?; - Ok(rpl_value) - } - - async fn select_room_and_identify( - state: Arc>, - name: String, - ) -> Result<(), Error> { - let result: RoomRpl = Self::cmd(&state, RoomCmd { name }).await?; - match result { - RoomRpl::Success => {} - RoomRpl::InvalidRoom { reason } => { - let mut state = state.lock().await; - state.status = Status::Stopped(StopReason::InvalidRoom(reason)); - // FIXME This does not actually stop the room - state.connected = None; - return Ok(()); - } - } - - let nick = { - if let Some(nick) = &(state.lock().await).initial_nick { - nick.clone() - } else { - return Ok(()); - } - }; - Self::identify(&state, nick).await - } - - async fn identify(state: &Mutex, nick: String) -> Result<(), Error> { - let identity = state.lock().await.identity.clone(); - let result: IdentifyRpl = Self::cmd(state, IdentifyCmd { nick, identity }).await?; - Ok(()) - } -} - -pub struct Room { - state: Arc>, - /// Once this is dropped, all other room-related tasks, connections and - /// values are cleaned up. - dead_mans_switch: Sender, -} - -enum RoomVerified { - Yes, - No(StopReason), -} - -impl Room { - pub async fn new( - config: &'static Config, - name: String, - identity: String, - initial_nick: Option, - ) -> Self { - let (tx, rx) = oneshot::channel(); - - let room = Room { - state: Arc::new(Mutex::new(RoomState { - identity, - initial_nick, - connection: Connection::Connecting, - present: None, - })), - dead_mans_switch: tx, - }; - - let state_clone = room.state.clone(); - tokio::spawn(async move { - tokio::select! { - _ = rx => {} // Watch dead man's switch - _ = Self::run(state_clone, config,name) => {} - } - }); - - room - } - - /// Background task to connect to a room and stay connected. - async fn run(state: Arc>, config: &'static Config, name: String) { - // The room exists and we have successfully connected to it before - let mut room_verified = None; - - loop { - // Try to connect and run - match Self::connect(&config.cove_url, config.timeout).await { - Ok((tx, rx, mt)) => { - // Update state - { - let mut state = state.lock().await; - if state.connection.stopped() { - return; - } - state.connection = - Connection::Connected(Connected::new(tx, config.timeout)); - } - - // Stay connected - // TODO Start select_room_and_identify task - tokio::select! { - _ = mt.perform() => {} - _ = Self::receive(&state, rx, &mut room_verified) => {} - } - } - Err(e) if room_verified.is_none() => { - room_verified = Some(RoomVerified::No(StopReason::CouldNotConnect(e))) - } - Err(_) => {} - } - - // Clean up and maybe reconnect - { - let mut state = state.lock().await; - match room_verified { - Some(RoomVerified::Yes) => state.status = Status::Reconnecting, - Some(RoomVerified::No(reason)) => { - state.status = Status::Stopped(reason); - break; - } - None => { - state.status = Status::Stopped(StopReason::SomethingWentWrong); - break; - } - } - } - } - } - - async fn connect( - url: &str, - timeout: Duration, - ) -> Result<(ConnTx, ConnRx, ConnMaintenance), conn::Error> { - // This function exists to funnel errors using `?` short-circuiting. - // Inlining it would be annoying and verbose. - let stream = tokio_tungstenite::connect_async(url).await?.0; - let conn = conn::new(stream, timeout)?; - Ok(conn) - } - - async fn receive( - state: &Mutex, - mut rx: ConnRx, - room_verified: &mut Option, - ) -> anyhow::Result<()> { - while let Some(packet) = rx.recv().await? { - match packet { - Packet::Cmd { .. } => {} // Ignore, the server never sends commands - Packet::Rpl { id, rpl } => { - state.lock().await.on_rpl(&room, id, rpl, room_verified)?; - } - Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf), - } - } - Ok(()) - } -}