From adcd17deae6758329e30c0acae634f68c2695874 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sun, 22 Jan 2023 01:00:18 +0100 Subject: [PATCH] Make instance event handler synchronous and infallible --- examples/testbot_instance.rs | 37 +++++++++++++----------- src/bot/instance.rs | 56 ++++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/examples/testbot_instance.rs b/examples/testbot_instance.rs index 5a0190a..cc61eb6 100644 --- a/examples/testbot_instance.rs +++ b/examples/testbot_instance.rs @@ -1,11 +1,11 @@ //! Similar to the `testbot_manual` example, but using [`Instance`] to connect //! to the room (and toreconnect). -use std::time::Duration; - +use euphoxide::api::packet::ParsedPacket; use euphoxide::api::{Data, Nick, Send}; -use euphoxide::bot::instance::{Config, Event}; +use euphoxide::bot::instance::{Config, Snapshot}; use time::OffsetDateTime; +use tokio::sync::mpsc; const NICK: &str = "TestBot"; const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; @@ -43,18 +43,15 @@ fn format_delta(delta: time::Duration) -> String { parts.join(" ") } -async fn on_event(event: Event) -> Result<(), ()> { - let data = match event.packet.content { +async fn on_packet(packet: ParsedPacket, snapshot: Snapshot) -> Result<(), ()> { + let data = match packet.content { Ok(data) => data, Err(err) => { - println!("Error for {}: {err}", event.packet.r#type); + println!("Error for {}: {err}", packet.r#type); return Err(()); } }; - let conn_tx = event.snapshot.conn_tx; - let state = event.snapshot.state; - match data { Data::HelloEvent(ev) => println!("Connected with id {}", ev.session.id), Data::SnapshotEvent(ev) => { @@ -69,7 +66,7 @@ async fn on_event(event: Event) -> Result<(), ()> { // We only need to do this because we want to log the result of // the nick command. Otherwise, we could've just called // tx.send() synchronously and ignored the returned Future. - let conn_tx_clone = conn_tx.clone(); + let conn_tx_clone = snapshot.conn_tx.clone(); tokio::spawn(async move { // Awaiting the future returned by the send command lets you // (type-safely) access the server's reply. @@ -109,7 +106,7 @@ async fn on_event(event: Event) -> Result<(), ()> { } else if content == format!("!help @{NICK}") { reply = Some(HELP.to_string()); } else if content == format!("!uptime @{NICK}") { - if let Some(joined) = state.joined() { + if let Some(joined) = snapshot.state.joined() { let delta = OffsetDateTime::now_utc() - joined.since; reply = Some(format!("/me has been up for {}", format_delta(delta))); } @@ -125,7 +122,8 @@ async fn on_event(event: Event) -> Result<(), ()> { // would be a race between sending the message and closing // the connection as the send function can return before the // message has actually been sent. - let _ = conn_tx + let _ = snapshot + .conn_tx .send(Send { content: "/me dies".to_string(), parent: Some(event.0.id), @@ -138,7 +136,7 @@ async fn on_event(event: Event) -> Result<(), ()> { // If you are not interested in the result, you can just // throw away the future returned by the send function. println!("Sending reply..."); - let _ = conn_tx.send(Send { + let _ = snapshot.conn_tx.send(Send { content: reply, parent: Some(event.0.id), }); @@ -153,10 +151,17 @@ async fn on_event(event: Event) -> Result<(), ()> { #[tokio::main] async fn main() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let _instance = Config::new("test") .username(Some("TestBot")) - .build(on_event); + .build(move |e| { + let _ = tx.send(e); + }); - // Once the instance is dropped, it stops, so we wait indefinitely here. - tokio::time::sleep(Duration::from_secs(u64::MAX)).await; + while let Some(event) = rx.recv().await { + if on_packet(event.packet, event.snapshot).await.is_err() { + break; + } + } } diff --git a/src/bot/instance.rs b/src/bot/instance.rs index ef005fa..6ef4858 100644 --- a/src/bot/instance.rs +++ b/src/bot/instance.rs @@ -2,7 +2,6 @@ //! //! See [`Instance`] for more details. -use std::future::Future; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -40,6 +39,21 @@ pub struct Config { pub password: Option, } +// Previously, the event callback was asynchronous and would return a result. It +// was called in-line to calling Conn::recv. The idea was that the instance +// would stop if the event handler returned Err. This was, however, not even +// implemented correctly and the instance would just reconnect. +// +// The new event handler is synchronous. This way, it becomes harder to +// accidentally block Conn::recv, for example by waiting for a channel with +// limited capacity. If async code must be executed upon receiving a command, +// the user can start a task from inside the handler. +// +// The new event handler does not return anything. This makes the code nicer. In +// the use cases I'm thinking of, it should not be a problem: If the event +// handler encounters errors, there's usually other ways to tell the same. Make +// the event handler ignore the errors and stop the instance in that other way. + impl Config { pub fn new(room: S) -> Self { Self { @@ -83,10 +97,9 @@ impl Config { self } - pub fn build(self, on_event: F) -> Instance + pub fn build(self, on_event: F) -> Instance where - F: FnMut(Event) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, + F: Fn(Event) + Send + Sync + 'static, { Instance::new(self, on_event) } @@ -124,14 +137,13 @@ pub struct Instance { } impl Instance { - pub fn new(config: Config, on_event: F) -> Self + pub fn new(config: Config, on_event: F) -> Self where - F: FnMut(Event) -> Fut + Send + 'static, - Fut: Future> + Send + 'static, + F: Fn(Event) + Send + Sync + 'static, { debug!("{}: Created with config {config:?}", config.name); let (request_tx, request_rx) = mpsc::unbounded_channel(); - tokio::spawn(Self::run::(config.clone(), on_event, request_rx)); + tokio::spawn(Self::run::(config.clone(), on_event, request_rx)); Self { config, request_tx } } @@ -145,17 +157,16 @@ impl Instance { rx.await.ok() } - async fn run( + async fn run( config: Config, - mut on_event: F, + on_event: F, mut request_rx: mpsc::UnboundedReceiver>, ) where - F: FnMut(Event) -> Fut, - Fut: Future>, + F: Fn(Event), { // TODO Only delay reconnecting if previous reconnect attempt failed loop { - Self::run_once::(&config, &mut on_event, &mut request_rx).await; + Self::run_once::(&config, &on_event, &mut request_rx).await; debug!( "{}: Waiting {} seconds before reconnecting", config.name, @@ -189,14 +200,13 @@ impl Instance { } } - async fn run_once( + async fn run_once( config: &Config, - on_event: &mut F, + on_event: &F, request_rx: &mut mpsc::UnboundedReceiver>, ) -> Option<()> where - F: FnMut(Event) -> Fut, - Fut: Future>, + F: Fn(Event), { debug!("{}: Connecting...", config.name); let (mut conn, cookies) = Conn::connect( @@ -212,7 +222,7 @@ impl Instance { let conn_tx = conn.tx().clone(); let result = select! { - r = Self::receive::(config, &mut conn, on_event) => r, + r = Self::receive::(config, &mut conn, on_event) => r, _ = Self::handle_requests(request_rx, &conn_tx) => Ok(()), }; if let Err(err) = result { @@ -226,10 +236,9 @@ impl Instance { Some(()) } - async fn receive(config: &Config, conn: &mut Conn, on_event: &mut F) -> conn::Result<()> + async fn receive(config: &Config, conn: &mut Conn, on_event: &F) -> conn::Result<()> where - F: FnMut(Event) -> Fut, - Fut: Future>, + F: Fn(Event), { loop { let packet = conn.recv().await?; @@ -266,10 +275,7 @@ impl Instance { _ => {} } - if on_event(event).await.is_err() { - warn!("{}: on_event handler returned Err(())", config.name); - break; - } + on_event(event); } Ok(())