diff --git a/src/bot/instance.rs b/src/bot/instance.rs index 9a0e78d..111bca5 100644 --- a/src/bot/instance.rs +++ b/src/bot/instance.rs @@ -11,6 +11,7 @@ use cookie::{Cookie, CookieJar}; use log::{debug, warn}; use tokio::select; use tokio::sync::{mpsc, oneshot}; +use tokio_tungstenite::tungstenite; use tokio_tungstenite::tungstenite::http::HeaderValue; use crate::api::packet::ParsedPacket; @@ -169,6 +170,14 @@ pub enum Event { Stopped(InstanceConfig), } +/// An error that occurred inside an [`Instance`] while it was running. +enum RunError { + StoppedManually, + InstanceDropped, + CouldNotConnect(tungstenite::Error), + Conn(conn::Error), +} + /// A single instance of a bot in a single room. /// /// The instance automatically connects to its room once it is created, and it @@ -228,19 +237,44 @@ impl Instance { ) where F: Fn(Event), { - // TODO Only delay reconnecting if previous reconnect attempt failed loop { + debug!("{}: Connecting...", config.name); + on_event(Event::Connecting(config.clone())); - Self::run_once::(&config, &on_event, &mut request_rx).await; + let result = Self::run_once::(&config, &on_event, &mut request_rx).await; on_event(Event::Disconnected(config.clone())); - debug!( - "{}: Waiting {} seconds before reconnecting", - config.name, - config.server.reconnect_delay.as_secs(), - ); - tokio::time::sleep(config.server.reconnect_delay).await; + let connected = match result { + Ok(()) => { + debug!("{}: Connection closed normally", config.name); + true + } + Err(RunError::StoppedManually) => { + debug!("{}: Instance stopped manually", config.name); + break; + } + Err(RunError::InstanceDropped) => { + debug!("{}: Instance dropped", config.name); + break; + } + Err(RunError::CouldNotConnect(err)) => { + warn!("{}: Failed to connect: {err}", config.name); + false + } + Err(RunError::Conn(err)) => { + warn!("{} An error occurred: {err}", config.name); + true + } + }; + + if !connected { + let s = config.server.reconnect_delay.as_secs(); + debug!("{}: Waiting {s} seconds before reconnecting", config.name); + tokio::time::sleep(config.server.reconnect_delay).await; + } } + + on_event(Event::Stopped(config)) } fn get_cookies(config: &InstanceConfig) -> HeaderValue { @@ -271,11 +305,10 @@ impl Instance { config: &InstanceConfig, on_event: &F, request_rx: &mut mpsc::UnboundedReceiver>, - ) -> Option<()> + ) -> Result<(), RunError> where F: Fn(Event), { - debug!("{}: Connecting...", config.name); let (mut conn, cookies) = Conn::connect( &config.server.domain, &config.room, @@ -284,32 +317,28 @@ impl Instance { config.server.timeout, ) .await - .ok()?; + .map_err(RunError::CouldNotConnect)?; + Self::set_cookies(config, cookies); on_event(Event::Connected(config.clone(), conn.tx().clone())); let conn_tx = conn.tx().clone(); - let result = select! { + select! { r = Self::receive::(config, &mut conn, on_event) => r, - _ = Self::handle_requests(request_rx, &conn_tx) => Ok(()), - }; - if let Err(err) = result { - if matches!(err, conn::Error::ConnectionClosed) { - debug!("{}: Connection closed, reconnecting", config.name); - } else { - warn!("{}: An error occurred, reconnecting: {err}", config.name); - } + r = Self::handle_requests(request_rx, &conn_tx) => Err(r), } - - Some(()) } - async fn receive(config: &InstanceConfig, conn: &mut Conn, on_event: &F) -> conn::Result<()> + async fn receive( + config: &InstanceConfig, + conn: &mut Conn, + on_event: &F, + ) -> Result<(), RunError> where F: Fn(Event), { loop { - let packet = conn.recv().await?; + let packet = conn.recv().await.map_err(RunError::Conn)?; let snapshot = Snapshot { conn_tx: conn.tx().clone(), state: conn.state().clone(), @@ -348,9 +377,10 @@ impl Instance { async fn handle_requests( request_rx: &mut mpsc::UnboundedReceiver>, conn_tx: &ConnTx, - ) { + ) -> RunError { while let Some(request) = request_rx.recv().await { let _ = request.send(conn_tx.clone()); } + RunError::InstanceDropped } }