From e5eefd8adaaed977dbc7212fe4fd45aebceee172 Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 28 Feb 2022 22:45:01 +0100 Subject: [PATCH] Start rewriting room --- cove-tui/src/room.rs | 345 ++++++++++++++++++++++++------------------- 1 file changed, 196 insertions(+), 149 deletions(-) diff --git a/cove-tui/src/room.rs b/cove-tui/src/room.rs index 456637f..9d885af 100644 --- a/cove-tui/src/room.rs +++ b/cove-tui/src/room.rs @@ -2,13 +2,15 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use anyhow::bail; use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx}; use cove_core::packets::{ - Cmd, IdentifyCmd, IdentifyRpl, NickRpl, Ntf, Packet, RoomRpl, Rpl, SendRpl, WhoRpl, + Cmd, IdentifyCmd, IdentifyRpl, NickRpl, Ntf, Packet, RoomCmd, RoomRpl, Rpl, SendRpl, WhoRpl, }; use cove_core::{Session, SessionId}; use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; +use tokio_tungstenite::connect_async; use tui::widgets::StatefulWidget; use crate::config::Config; @@ -29,6 +31,34 @@ pub enum Error { Replies(#[from] replies::Error), } +pub enum StopReason { + CouldNotConnect(conn::Error), + InvalidRoom(String), + InvalidIdentity(String), + /// Something went wrong but we don't know what. + SomethingWentWrong, +} + +/// General state of the room connection. +pub enum Status { + /// Connecting to the room for the first time. + Connecting, + /// Reconnecting to the room after being connected at least once. + Reconnecting, + /// Identifying with the server after a connection has been established. + /// This occurs when an initial nick has been set on room creation. + Identifying, + /// User must enter a nick. May contain error message about previous nick. + NickRequired(Option), + /// Fully connected. + Nominal, + /// Not connected and not attempting any reconnects. This is likely due to + /// factors out of the application's control (e. g. no internet connection, + /// room does not exist), meaning that retrying without user intervention + /// doesn't make sense. + Stopped(StopReason), +} + /// State for when a websocket connection exists. struct Connected { tx: ConnTx, @@ -38,149 +68,31 @@ struct Connected { /// State for when a client has fully joined a room. pub struct Present { - pub session: Session, - pub others: HashMap, + session: Session, + others: HashMap, } -pub enum Status { - /// No action required by the UI. - Nominal, - /// User must enter a nick. - NickRequired, - CouldNotConnect, - InvalidRoom(String), - InvalidNick(String), - InvalidIdentity(String), -} - -pub struct Room { - name: String, +pub struct RoomState { identity: String, initial_nick: Option, status: Status, connected: Option, present: Option, - still_alive: Sender, } -impl Room { - pub async fn new( - name: String, - identity: String, - initial_nick: Option, - config: &'static Config, - ) -> Arc> { - let (tx, rx) = oneshot::channel(); - - let room = Arc::new(Mutex::new(Self { - name, - identity, - initial_nick, - status: Status::Nominal, - connected: None, - present: None, - still_alive: tx, - })); - - let room_clone = room.clone(); - tokio::spawn(async move { - tokio::select! { - _ = rx => {} - _ = Self::bg_task(room_clone, config) => {} - } - }); - - room - } - - pub fn status(&self) -> &Status { - &self.status - } - - pub fn connected(&self) -> bool { - self.connected.is_some() - } - - pub fn present(&self) -> Option<&Present> { - self.present.as_ref() - } - - async fn bg_task(room: Arc>, config: &'static Config) { - let mut room_verified = false; - loop { - if let Ok((tx, rx, mt)) = Self::connect(&config.cove_url, config.timeout).await { - { - let mut room = room.lock().await; - room.status = Status::Nominal; - room.connected = Some(Connected { - tx, - next_id: 0, - replies: Replies::new(config.timeout), - }); - } - - tokio::select! { - _ = mt.perform() => {} - _ = Self::receive(room.clone(), rx, &mut room_verified) => {} - } - } - - if !room_verified { - room.lock().await.status = Status::CouldNotConnect; - return; - } - } - } - - async fn connect( - url: &str, - timeout: Duration, - ) -> anyhow::Result<(ConnTx, ConnRx, ConnMaintenance)> { - let stream = tokio_tungstenite::connect_async(url).await?.0; - let conn = conn::new(stream, timeout)?; - Ok(conn) - } - - async fn receive( - room: Arc>, - mut rx: ConnRx, - room_verified: &mut bool, - ) -> anyhow::Result<()> { - while let Some(packet) = rx.recv().await? { - match packet { - Packet::Cmd { .. } => {} // Ignore, the server never sends commands - Packet::Rpl { id, rpl } => { - room.lock().await.on_rpl(&room, id, rpl, room_verified)?; - } - Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf), - } - } - Ok(()) - } - +impl RoomState { fn on_rpl( &mut self, - room: &Arc>, id: u64, rpl: Rpl, - room_verified: &mut bool, + room_verified: &mut Option, ) -> anyhow::Result<()> { match &rpl { Rpl::Room(RoomRpl::Success) => { - *room_verified = true; - if let Some(nick) = &self.initial_nick { - // TODO Use previous nick if there is one - tokio::spawn(Self::identify( - room.clone(), - nick.clone(), - self.identity.clone(), - )); - } else { - self.status = Status::NickRequired; - } + *room_verified = Some(RoomVerified::Yes); } Rpl::Room(RoomRpl::InvalidRoom { reason }) => { - self.status = Status::InvalidRoom(reason.clone()); + self.status = Status::Stopped(StopReason::InvalidRoom(reason.clone())); anyhow::bail!("invalid room"); } Rpl::Identify(IdentifyRpl::Success { @@ -188,34 +100,26 @@ impl Room { others, last_message, }) => { + let session = you.clone(); let others = others .iter() .map(|session| (session.id, session.clone())) .collect(); - self.present = Some(Present { - session: you.clone(), - others, - }); + self.present = Some(Present { session, others }); // TODO Send last message to store } - Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => { - self.status = Status::InvalidNick(reason.clone()); - } - Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => { - self.status = Status::InvalidIdentity(reason.clone()); - } + Rpl::Identify(IdentifyRpl::InvalidNick { .. }) => {} + Rpl::Identify(IdentifyRpl::InvalidIdentity { .. }) => {} Rpl::Nick(NickRpl::Success { you }) => { if let Some(present) = &mut self.present { present.session = you.clone(); } } - Rpl::Nick(NickRpl::InvalidNick { reason }) => { - self.status = Status::InvalidNick(reason.clone()); - } + Rpl::Nick(NickRpl::InvalidNick { .. }) => {} Rpl::Send(SendRpl::Success { message }) => { // TODO Send message to store } - Rpl::Send(SendRpl::InvalidContent { reason }) => {} + Rpl::Send(SendRpl::InvalidContent { .. }) => {} Rpl::Who(WhoRpl { you, others }) => { if let Some(present) = &mut self.present { present.session = you.clone(); @@ -257,14 +161,14 @@ impl Room { } } - async fn cmd(room: &Mutex, cmd: C) -> Result + async fn cmd(state: &Mutex, cmd: C) -> Result where C: Into, Rpl: TryInto, { - let token = { - let mut room = room.lock().await; - let connected = room.connected.as_mut().ok_or(Error::NotConnected)?; + let pending_reply = { + let mut state = state.lock().await; + let connected = state.connected.as_mut().ok_or(Error::NotConnected)?; let id = connected.next_id; connected.next_id += 1; @@ -274,13 +178,156 @@ impl Room { pending_reply }; - let rpl = token.get().await?; - let rpl = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?; - Ok(rpl) + let rpl = pending_reply.get().await?; + let rpl_value = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?; + Ok(rpl_value) } - async fn identify(room: Arc>, nick: String, identity: String) -> Result<(), Error> { - let result: IdentifyRpl = Self::cmd(&room, IdentifyCmd { nick, identity }).await?; + async fn select_room_and_identify( + state: Arc>, + name: String, + ) -> Result<(), Error> { + let result: RoomRpl = Self::cmd(&state, RoomCmd { name }).await?; + match result { + RoomRpl::Success => {} + RoomRpl::InvalidRoom { reason } => { + let mut state = state.lock().await; + state.status = Status::Stopped(StopReason::InvalidRoom(reason)); + // FIXME This does not actually stop the room + state.connected = None; + return Ok(()); + } + } + + let nick = { + if let Some(nick) = &(state.lock().await).initial_nick { + nick.clone() + } else { + return Ok(()); + } + }; + Self::identify(&state, nick).await + } + + async fn identify(state: &Mutex, nick: String) -> Result<(), Error> { + let identity = state.lock().await.identity.clone(); + let result: IdentifyRpl = Self::cmd(state, IdentifyCmd { nick, identity }).await?; + Ok(()) + } +} + +pub struct Room { + state: Arc>, + /// Once this is dropped, all other room-related tasks, connections and + /// values are cleaned up. + dead_mans_switch: Sender, +} + +enum RoomVerified { + Yes, + No(StopReason), +} + +impl Room { + pub async fn new( + config: &'static Config, + name: String, + identity: String, + initial_nick: Option, + ) -> Self { + let (tx, rx) = oneshot::channel(); + + let room = Room { + state: Arc::new(Mutex::new(RoomState { + identity, + initial_nick, + status: Status::Connecting, + connected: None, + present: None, + })), + dead_mans_switch: tx, + }; + + let state_clone = room.state.clone(); + tokio::spawn(async move { + tokio::select! { + _ = rx => {} // Watch dead man's switch + _ = Self::run(state_clone, config,name) => {} + } + }); + + room + } + + /// Background task to connect to a room and stay connected. + async fn run(state: Arc>, config: &'static Config, name: String) { + // The room exists and we have successfully connected to it before + let mut room_verified = None; + + loop { + // Try to connect and run + match Self::connect(&config.cove_url, config.timeout).await { + Ok((tx, rx, mt)) => { + state.lock().await.connected = Some(Connected { + tx, + next_id: 0, + replies: Replies::new(config.timeout), + }); + + tokio::select! { + _ = mt.perform() => {} + _ = Self::receive(&state, rx, &mut room_verified) => {} + } + } + Err(e) if room_verified.is_none() => { + room_verified = Some(RoomVerified::No(StopReason::CouldNotConnect(e))) + } + Err(_) => {} + } + + // Clean up and maybe reconnect + { + let mut state = state.lock().await; + match room_verified { + Some(RoomVerified::Yes) => state.status = Status::Reconnecting, + Some(RoomVerified::No(reason)) => { + state.status = Status::Stopped(reason); + break; + } + None => { + state.status = Status::Stopped(StopReason::SomethingWentWrong); + break; + } + } + } + } + } + + async fn connect( + url: &str, + timeout: Duration, + ) -> Result<(ConnTx, ConnRx, ConnMaintenance), conn::Error> { + // This function exists to funnel errors using `?` short-circuiting. + // Inlining it would be annoying and verbose. + let stream = tokio_tungstenite::connect_async(url).await?.0; + let conn = conn::new(stream, timeout)?; + Ok(conn) + } + + async fn receive( + state: &Mutex, + mut rx: ConnRx, + room_verified: &mut Option, + ) -> anyhow::Result<()> { + while let Some(packet) = rx.recv().await? { + match packet { + Packet::Cmd { .. } => {} // Ignore, the server never sends commands + Packet::Rpl { id, rpl } => { + state.lock().await.on_rpl(&room, id, rpl, room_verified)?; + } + Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf), + } + } Ok(()) } }