Fix instances taking too long to stop

This commit is contained in:
Joscha 2023-02-17 18:16:13 +01:00
parent be9d43f8cc
commit aa6383c1cf
2 changed files with 34 additions and 5 deletions

View file

@ -19,6 +19,7 @@ Procedure when bumping the version number:
### Fixed ### Fixed
- `!uptime` minute count - `!uptime` minute count
- Instance reconnecting after encountering a 404 (it now stops and logs an error) - Instance reconnecting after encountering a 404 (it now stops and logs an error)
- Instance taking too long to stop when stopped during reconnect delay
## v0.3.0 - 2023-02-11 ## v0.3.0 - 2023-02-11

View file

@ -2,6 +2,7 @@
//! //!
//! See [`Instance`] for more details. //! See [`Instance`] for more details.
use std::convert::Infallible;
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -276,6 +277,9 @@ enum RunError {
pub struct Instance { pub struct Instance {
config: InstanceConfig, config: InstanceConfig,
request_tx: mpsc::UnboundedSender<Request>, request_tx: mpsc::UnboundedSender<Request>,
// In theory, request_tx should be sufficient as canary, but I'm not sure
// exactly how to check it during the reconnect timeout.
_canary_tx: oneshot::Sender<Infallible>,
} }
impl Instance { impl Instance {
@ -306,9 +310,22 @@ impl Instance {
F: Fn(Event) + Send + Sync + 'static, F: Fn(Event) + Send + Sync + 'static,
{ {
idebug!(config, "Created with config {config:?}"); idebug!(config, "Created with config {config:?}");
let (request_tx, request_rx) = mpsc::unbounded_channel(); let (request_tx, request_rx) = mpsc::unbounded_channel();
tokio::spawn(Self::run::<F>(config.clone(), on_event, request_rx)); let (canary_tx, canary_rx) = oneshot::channel();
Self { config, request_tx }
tokio::spawn(Self::run::<F>(
config.clone(),
on_event,
request_rx,
canary_rx,
));
Self {
config,
request_tx,
_canary_tx: canary_tx,
}
} }
pub fn config(&self) -> &InstanceConfig { pub fn config(&self) -> &InstanceConfig {
@ -342,13 +359,26 @@ impl Instance {
async fn run<F: Fn(Event)>( async fn run<F: Fn(Event)>(
config: InstanceConfig, config: InstanceConfig,
on_event: F, on_event: F,
request_rx: mpsc::UnboundedReceiver<Request>,
canary_rx: oneshot::Receiver<Infallible>,
) {
select! {
_ = Self::stay_connected(&config, &on_event, request_rx) => (),
_ = canary_rx => { idebug!(config, "Instance dropped"); },
}
on_event(Event::Stopped(config))
}
async fn stay_connected<F: Fn(Event)>(
config: &InstanceConfig,
on_event: &F,
mut request_rx: mpsc::UnboundedReceiver<Request>, mut request_rx: mpsc::UnboundedReceiver<Request>,
) { ) {
loop { loop {
idebug!(config, "Connecting..."); idebug!(config, "Connecting...");
on_event(Event::Connecting(config.clone())); on_event(Event::Connecting(config.clone()));
let result = Self::run_once::<F>(&config, &on_event, &mut request_rx).await; let result = Self::run_once::<F>(config, on_event, &mut request_rx).await;
on_event(Event::Disconnected(config.clone())); on_event(Event::Disconnected(config.clone()));
let connected = match result { let connected = match result {
@ -386,8 +416,6 @@ impl Instance {
tokio::time::sleep(config.server.reconnect_delay).await; tokio::time::sleep(config.server.reconnect_delay).await;
} }
} }
on_event(Event::Stopped(config))
} }
fn get_cookies(config: &InstanceConfig) -> HeaderValue { fn get_cookies(config: &InstanceConfig) -> HeaderValue {