Stop instance when it is dropped

This commit is contained in:
Joscha 2023-01-22 21:42:04 +01:00
parent 3f14151feb
commit 5d1d12522e

View file

@ -11,6 +11,7 @@ use cookie::{Cookie, CookieJar};
use log::{debug, warn}; use log::{debug, warn};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_tungstenite::tungstenite;
use tokio_tungstenite::tungstenite::http::HeaderValue; use tokio_tungstenite::tungstenite::http::HeaderValue;
use crate::api::packet::ParsedPacket; use crate::api::packet::ParsedPacket;
@ -169,6 +170,14 @@ pub enum Event {
Stopped(InstanceConfig), Stopped(InstanceConfig),
} }
/// An error that occurred inside an [`Instance`] while it was running.
enum RunError {
StoppedManually,
InstanceDropped,
CouldNotConnect(tungstenite::Error),
Conn(conn::Error),
}
/// A single instance of a bot in a single room. /// A single instance of a bot in a single room.
/// ///
/// The instance automatically connects to its room once it is created, and it /// The instance automatically connects to its room once it is created, and it
@ -228,21 +237,46 @@ impl Instance {
) where ) where
F: Fn(Event), F: Fn(Event),
{ {
// TODO Only delay reconnecting if previous reconnect attempt failed
loop { loop {
debug!("{}: Connecting...", config.name);
on_event(Event::Connecting(config.clone())); on_event(Event::Connecting(config.clone()));
Self::run_once::<F>(&config, &on_event, &mut request_rx).await; let result = Self::run_once::<F>(&config, &on_event, &mut request_rx).await;
on_event(Event::Disconnected(config.clone())); on_event(Event::Disconnected(config.clone()));
debug!( let connected = match result {
"{}: Waiting {} seconds before reconnecting", Ok(()) => {
config.name, debug!("{}: Connection closed normally", config.name);
config.server.reconnect_delay.as_secs(), true
); }
Err(RunError::StoppedManually) => {
debug!("{}: Instance stopped manually", config.name);
break;
}
Err(RunError::InstanceDropped) => {
debug!("{}: Instance dropped", config.name);
break;
}
Err(RunError::CouldNotConnect(err)) => {
warn!("{}: Failed to connect: {err}", config.name);
false
}
Err(RunError::Conn(err)) => {
warn!("{} An error occurred: {err}", config.name);
true
}
};
if !connected {
let s = config.server.reconnect_delay.as_secs();
debug!("{}: Waiting {s} seconds before reconnecting", config.name);
tokio::time::sleep(config.server.reconnect_delay).await; tokio::time::sleep(config.server.reconnect_delay).await;
} }
} }
on_event(Event::Stopped(config))
}
fn get_cookies(config: &InstanceConfig) -> HeaderValue { fn get_cookies(config: &InstanceConfig) -> HeaderValue {
let guard = config.server.cookies.lock().unwrap(); let guard = config.server.cookies.lock().unwrap();
let cookies = guard let cookies = guard
@ -271,11 +305,10 @@ impl Instance {
config: &InstanceConfig, config: &InstanceConfig,
on_event: &F, on_event: &F,
request_rx: &mut mpsc::UnboundedReceiver<oneshot::Sender<ConnTx>>, request_rx: &mut mpsc::UnboundedReceiver<oneshot::Sender<ConnTx>>,
) -> Option<()> ) -> Result<(), RunError>
where where
F: Fn(Event), F: Fn(Event),
{ {
debug!("{}: Connecting...", config.name);
let (mut conn, cookies) = Conn::connect( let (mut conn, cookies) = Conn::connect(
&config.server.domain, &config.server.domain,
&config.room, &config.room,
@ -284,32 +317,28 @@ impl Instance {
config.server.timeout, config.server.timeout,
) )
.await .await
.ok()?; .map_err(RunError::CouldNotConnect)?;
Self::set_cookies(config, cookies); Self::set_cookies(config, cookies);
on_event(Event::Connected(config.clone(), conn.tx().clone())); on_event(Event::Connected(config.clone(), conn.tx().clone()));
let conn_tx = conn.tx().clone(); let conn_tx = conn.tx().clone();
let result = select! { select! {
r = Self::receive::<F>(config, &mut conn, on_event) => r, r = Self::receive::<F>(config, &mut conn, on_event) => r,
_ = Self::handle_requests(request_rx, &conn_tx) => Ok(()), r = Self::handle_requests(request_rx, &conn_tx) => Err(r),
};
if let Err(err) = result {
if matches!(err, conn::Error::ConnectionClosed) {
debug!("{}: Connection closed, reconnecting", config.name);
} else {
warn!("{}: An error occurred, reconnecting: {err}", config.name);
} }
} }
Some(()) async fn receive<F>(
} config: &InstanceConfig,
conn: &mut Conn,
async fn receive<F>(config: &InstanceConfig, conn: &mut Conn, on_event: &F) -> conn::Result<()> on_event: &F,
) -> Result<(), RunError>
where where
F: Fn(Event), F: Fn(Event),
{ {
loop { loop {
let packet = conn.recv().await?; let packet = conn.recv().await.map_err(RunError::Conn)?;
let snapshot = Snapshot { let snapshot = Snapshot {
conn_tx: conn.tx().clone(), conn_tx: conn.tx().clone(),
state: conn.state().clone(), state: conn.state().clone(),
@ -348,9 +377,10 @@ impl Instance {
async fn handle_requests( async fn handle_requests(
request_rx: &mut mpsc::UnboundedReceiver<oneshot::Sender<ConnTx>>, request_rx: &mut mpsc::UnboundedReceiver<oneshot::Sender<ConnTx>>,
conn_tx: &ConnTx, conn_tx: &ConnTx,
) { ) -> RunError {
while let Some(request) = request_rx.recv().await { while let Some(request) = request_rx.recv().await {
let _ = request.send(conn_tx.clone()); let _ = request.send(conn_tx.clone());
} }
RunError::InstanceDropped
} }
} }