diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7a89179 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "files.insertFinalNewline": true, + "rust-analyzer.cargo.features": "all", + "rust-analyzer.imports.granularity.enforce": true, + "rust-analyzer.imports.granularity.group": "module", + "rust-analyzer.imports.group.enable": true, + "evenBetterToml.formatter.columnWidth": 100, +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 2df28e9..524e877 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). Procedure when bumping the version number: + 1. Update dependencies in a separate commit 2. Set version number in `Cargo.toml` 3. Add new section in this changelog @@ -13,13 +14,147 @@ Procedure when bumping the version number: ## Unreleased +## v0.6.1 - 2025-02-23 + +### Changed + +- Updated set of emoji names + +### Fixed + +- Nick hue hashing algorithm in some edge cases + +## v0.6.0 - 2025-02-21 + +### Added + +- `api::Time::from_timestamp` +- `api::Time::as_timestamp` +- `bot::botrulez::full_help` +- `bot::botrulez::ping` +- `bot::botrulez::short_help` +- `bot::botrulez::uptime` +- `bot::botrulez::format_relative_time` + +### Changed + +- **(breaking)** Switched to `jiff` from `time` +- **(breaking)** `api::Time` contents are now an `i64` +- **(breaking)** Bumped `tokio-tungstenite` dependency from `0.18` to `0.24`. If + this causes a panic while using euphoxide, consider following the steps + mentioned in the [tokio-tungstenite README]. If I'm reading the [rustls docs] + correctly, it is on the users of the libraries to set the required features. +- `bot::botrulez::format_duration` now no longer mentions "since" or "ago", but + instead has a sign (`-`) if the duration is negative. + +[tokio-tungstenite README]: https://github.com/snapview/tokio-tungstenite?tab=readme-ov-file#features +[rustls docs]: https://docs.rs/rustls/0.23.19/rustls/crypto/struct.CryptoProvider.html#using-the-per-process-default-cryptoprovider + +### Removed + +- `api::Time::new` + +## v0.5.1 - 2024-05-20 + +### Added + +- `Emoji::load_from_json` + +### Changed + +- Updated set of emoji names + +## v0.5.0 - 2023-12-27 + +### Changed + +- **(breaking)** `bot::instance::ServerConfig::default` now points to `euphoria.leet.nu` +- **(breaking)** Bumped `cookie` dependency from `0.17` to `0.18` +- **(breaking)** Bumped `tokio-tungstenite` dependency from `0.18` to `0.21` +- Updated set of emoji names +- Documentation now references `euphoria.leet.nu` instead of `euphoria.io` + +## v0.4.0 - 2023-05-14 + +### Added + +- `bot::botrulez::Uptime` now implements `bot::command::Command` +- `bot::command::parse_prefix_initiated` +- `bot::commands::Commands::fallthrough` +- `bot::commands::Commands::set_fallthrough` +- `conn::Error::ConnectionTimedOut` + +### Changed + +- **(breaking)** `bot::command::ClapCommand::execute` now returns a `Result` instead of a `Result<(), E>` +- **(breaking)** `bot::command::Command::execute` now returns a `Result` instead of a `Result<(), E>` +- **(breaking)** `bot::commands::Commands::handle_packet` now returns a `Result` instead of a `Result<(), E>` +- **(breaking)** `bot::instance::Snapshot` renamed to `ConnSnapshot` +- **(breaking)** `conn::Conn::connect` now returns `conn::Result` +- `bot::instance::Instance` now implements `Clone` + +### Fixed + +- **(breaking)** Deserializing empty events and replies by turning unit structs into empty structs +- `phone` and `mobile` emoji +- Instances getting stuck in "Connecting" state +- Euph errors always turning into `conn::Error`s + +## v0.3.1 - 2023-02-26 + +### Added + +- `bot::botrulez::FullHelp` now implements `bot::command::Command` +- `bot::botrulez::Ping` now implements `bot::command::Command` +- `bot::botrulez::ShortHelp` now implements `bot::command::Command` +- `bot::instances::Instances::is_from_known_instance` + +### Changed + +- Instances log to target `euphoxide::live::` +- Instances stay connected if auth is required but no password is set + +### Fixed + +- `!uptime` minute count +- Instance reconnecting after encountering a 404 (it now stops and logs an error) +- Instance taking too long to stop when stopped during reconnect delay + +## v0.3.0 - 2023-02-11 + +### Added + +- `bot` feature +- `bot` module (enable the `bot` feature to use) +- `Emoji` for finding, replacing and removing colon-delimited emoji in text +- `api::Time::new` +- `nick::hue` +- `nick::mention` +- `nick::normalize` +- Debug logging using the `log` crate +- `testbot_instance` example using the new `bot::instance::Instance` +- VSCode project settings + +### Changed + +- **(breaking)** `conn` module redesigned and rewritten +- **(breaking)** `nick_hue` moved to `nick::hue_without_removing_emoji` +- Renamed `testbot` example to `testbot_manual` + +### Removed + +- **(breaking)** `connect` (see `conn::Conn::connect`) +- **(breaking)** `wrap` (see `conn::Conn::wrap`) + ## v0.2.0 - 2022-12-10 ### Added -- `euphoxide::connect` + +- `connect` ### Changed -- Updated dependencies in backwards-incompatible way + +- **(breaking)** Updated dependencies ## v0.1.0 - 2022-10-23 diff --git a/Cargo.toml b/Cargo.toml index 574d6d8..cf65579 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,22 +1,64 @@ [package] name = "euphoxide" -version = "0.2.0" +version = "0.6.1" edition = "2021" +[features] +bot = ["dep:async-trait", "dep:clap", "dep:cookie"] + [dependencies] -serde = { version = "1.0.149", features = ["derive"] } -serde_json = "1.0.89" -time = { version = "0.3.17", features = ["serde"] } -tokio = { version = "1.23.0", features = ["time", "sync", "macros", "rt"] } +async-trait = { version = "0.1.86", optional = true } +caseless = "0.2.2" +cookie = { version = "0.18.1", optional = true } +futures-util = { version = "0.3.31", default-features = false, features = ["sink"] } +jiff = { version = "0.2.1", features = ["serde"] } +log = "0.4.25" +serde = { version = "1.0.218", features = ["derive"] } +serde_json = "1.0.139" +tokio = { version = "1.43.0", features = ["time", "sync", "macros", "rt"] } +tokio-stream = "0.1.17" +tokio-tungstenite = { version = "0.26.2", features = ["rustls-tls-native-roots"] } +unicode-normalization = "0.1.24" -[dependencies.futures] -version = "0.3.25" +[dependencies.clap] +version = "4.5.30" +optional = true default-features = false -features = ["std"] - -[dependencies.tokio-tungstenite] -version = "0.18.0" -features = ["rustls-tls-native-roots"] +features = ["std", "derive", "deprecated"] [dev-dependencies] # For example bot -tokio = { version = "1.23.0", features = ["rt-multi-thread"] } +rustls = "0.23.23" +tokio = { version = "1.43.0", features = ["rt-multi-thread"] } + +[[example]] +name = "testbot_instance" +required-features = ["bot"] + +[[example]] +name = "testbot_instances" +required-features = ["bot"] + +[[example]] +name = "testbot_commands" +required-features = ["bot"] + +[lints] +rust.unsafe_code = { level = "forbid", priority = 1 } +# Lint groups +rust.deprecated_safe = "warn" +rust.future_incompatible = "warn" +rust.keyword_idents = "warn" +rust.rust_2018_idioms = "warn" +rust.unused = "warn" +# Individual lints +rust.non_local_definitions = "warn" +rust.redundant_imports = "warn" +rust.redundant_lifetimes = "warn" +rust.single_use_lifetimes = "warn" +rust.unit_bindings = "warn" +rust.unnameable_types = "warn" +rust.unused_import_braces = "warn" +rust.unused_lifetimes = "warn" +rust.unused_qualifications = "warn" +# Clippy +clippy.use_self = "warn" diff --git a/examples/testbot.rs b/examples/testbot.rs deleted file mode 100644 index 3e4f19a..0000000 --- a/examples/testbot.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::error::Error; -use std::time::{Duration, Instant}; - -use euphoxide::api::{Data, Nick, Send}; - -const URI: &str = "wss://euphoria.io/room/test/ws"; -const NICK: &str = "TestBot"; -const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; - -fn format_delta(delta: Duration) -> String { - const MINUTE: u64 = 60; - const HOUR: u64 = MINUTE * 60; - const DAY: u64 = HOUR * 24; - - let mut seconds = delta.as_secs(); - let mut parts = vec![]; - - let days = seconds / DAY; - if days > 0 { - parts.push(format!("{days}d")); - seconds -= days * DAY; - } - - let hours = seconds / HOUR; - if hours > 0 { - parts.push(format!("{hours}h")); - seconds -= hours * HOUR; - } - - let mins = seconds / MINUTE; - if mins > 0 { - parts.push(format!("{mins}m")); - seconds -= mins * MINUTE; - } - - if parts.is_empty() || seconds > 0 { - parts.push(format!("{seconds}s")); - } - - parts.join(" ") -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let start = Instant::now(); - - let (ws, _) = tokio_tungstenite::connect_async(URI).await?; - let (tx, mut rx) = euphoxide::conn::wrap(ws, Duration::from_secs(30)); - while let Some(packet) = rx.recv().await { - let data = match packet.content { - Ok(data) => data, - Err(err) => { - println!("Error for {}: {err}", packet.r#type); - continue; - } - }; - match data { - Data::HelloEvent(event) => println!("Connected with id {}", event.session.id), - Data::SnapshotEvent(event) => { - for session in event.listing { - println!("{:?} ({}) is already here", session.name, session.id); - } - - // Here, a new task is spawned so the main event loop can - // continue running immediately instead of waiting for a reply - // from the server. - let tx_clone = tx.clone(); - tokio::spawn(async move { - // Awaiting the future returned by the send command lets you - // (type-safely) access the server's reply. - let reply = tx_clone - .send(Nick { - name: NICK.to_string(), - }) - .await; - match reply { - Ok(reply) => println!("Set nick to {:?}", reply.to), - Err(err) => println!("Failed to set nick: {err}"), - }; - }); - } - Data::BounceEvent(_) => { - println!("Received bounce event, stopping"); - break; - } - Data::DisconnectEvent(_) => { - println!("Received disconnect event, stopping"); - break; - } - Data::JoinEvent(event) => println!("{:?} ({}) joined", event.0.name, event.0.id), - Data::PartEvent(event) => println!("{:?} ({}) left", event.0.name, event.0.id), - Data::NickEvent(event) => println!( - "{:?} ({}) is now known as {:?}", - event.from, event.id, event.to - ), - Data::SendEvent(event) => { - println!("Message {} was just sent", event.0.id.0); - - let content = event.0.content.trim(); - let mut reply = None; - - if content == "!ping" || content == format!("!ping @{NICK}") { - reply = Some("Pong!".to_string()); - } else if content == format!("!help @{NICK}") { - reply = Some(HELP.to_string()); - } else if content == format!("!uptime @{NICK}") { - let delta = Instant::now().duration_since(start); - reply = Some(format!("/me has been up for {}", format_delta(delta))); - } else if content == "!test" { - reply = Some("Test successful!".to_string()); - } else if content == format!("!kill @{NICK}") { - println!( - "I was killed by {:?} ({})", - event.0.sender.name, event.0.sender.id - ); - // Awaiting the server reply in the main loop to ensure the - // message is sent before we exit the loop. Otherwise, there - // would be a race between sending the message and closing - // the connection as the send function can return before the - // message has actually been sent. - let _ = tx - .send(Send { - content: "/me dies".to_string(), - parent: Some(event.0.id), - }) - .await; - break; - } - - if let Some(reply) = reply { - // If you are not interested in the result, you can just - // throw away the future returned by the send function. - println!("Sending reply..."); - let _ = tx.send(Send { - content: reply, - parent: Some(event.0.id), - }); - println!("Reply sent!"); - } - } - _ => {} - } - } - Ok(()) -} diff --git a/examples/testbot_commands.rs b/examples/testbot_commands.rs new file mode 100644 index 0000000..c3afada --- /dev/null +++ b/examples/testbot_commands.rs @@ -0,0 +1,150 @@ +// TODO Add description +// TODO Clean up and unify test bots + +use std::sync::Arc; + +use async_trait::async_trait; +use clap::Parser; +use euphoxide::api::Message; +use euphoxide::bot::botrulez::{FullHelp, HasDescriptions, HasStartTime, Ping, ShortHelp, Uptime}; +use euphoxide::bot::command::{Clap, ClapCommand, Context, General, Global, Hidden, Specific}; +use euphoxide::bot::commands::Commands; +use euphoxide::bot::instance::{Event, ServerConfig}; +use euphoxide::bot::instances::Instances; +use euphoxide::conn; +use jiff::Timestamp; +use log::error; +use tokio::sync::mpsc; + +const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; + +/// Kill this bot. +#[derive(Parser)] +struct KillArgs; + +struct Kill; + +#[async_trait] +impl ClapCommand for Kill { + type Args = KillArgs; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + bot: &mut Bot, + ) -> Result { + bot.stop = true; + ctx.reply(msg.id, "/me dies").await?; + Ok(true) + } +} + +/// Do some testing. +#[derive(Parser)] +struct TestArgs { + /// How much testing to do. + #[arg(default_value_t = 1)] + amount: u64, +} + +struct Test; + +#[async_trait] +impl ClapCommand for Test { + type Args = TestArgs; + + async fn execute( + &self, + args: Self::Args, + msg: &Message, + ctx: &Context, + _bot: &mut Bot, + ) -> Result { + let content = if args.amount == 1 { + format!("/me did {} test", args.amount) + } else { + format!("/me did {} tests", args.amount) + }; + ctx.reply(msg.id, content).await?; + Ok(true) + } +} + +struct Bot { + commands: Arc>, + start_time: Timestamp, + stop: bool, +} + +impl HasDescriptions for Bot { + fn descriptions(&self, ctx: &Context) -> Vec { + self.commands.descriptions(ctx) + } +} + +impl HasStartTime for Bot { + fn start_time(&self) -> Timestamp { + self.start_time + } +} + +#[tokio::main] +async fn main() { + // https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455247837 + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut instances = Instances::new(ServerConfig::default()); + + let mut cmds = Commands::new(); + cmds.add(Hidden(General::new("ping", Clap(Ping::default())))); + cmds.add(Specific::new("ping", Clap(Ping::default()))); + cmds.add(Hidden(General::new("help", Clap(ShortHelp::new(HELP))))); + cmds.add(Specific::new("help", Clap(FullHelp::new(HELP, "")))); + cmds.add(Specific::new("uptime", Clap(Uptime))); + cmds.add(Specific::new("kill", Clap(Kill))); + cmds.add(Global::new("test", Clap(Test))); + let cmds = Arc::new(cmds); + + let mut bot = Bot { + commands: cmds.clone(), + start_time: Timestamp::now(), + stop: false, + }; + + for room in ["test", "test2", "testing"] { + let tx_clone = tx.clone(); + let instance = instances + .server_config() + .clone() + .room(room) + .username(Some("TestBot")) + .build(move |e| { + let _ = tx_clone.send(e); + }); + instances.add(instance); + } + + while let Some(event) = rx.recv().await { + instances.purge(); + if instances.is_empty() { + break; + } + + if let Event::Packet(config, packet, snapshot) = event { + let result = cmds + .handle_packet(&config, &packet, &snapshot, &mut bot) + .await; + if let Err(err) = result { + error!("{err}"); + } + if bot.stop { + break; + } + } + } +} diff --git a/examples/testbot_instance.rs b/examples/testbot_instance.rs new file mode 100644 index 0000000..f60f3b9 --- /dev/null +++ b/examples/testbot_instance.rs @@ -0,0 +1,146 @@ +//! Similar to the `testbot_manual` example, but using [`Instance`] to connect +//! to the room (and to reconnect). + +use euphoxide::api::packet::ParsedPacket; +use euphoxide::api::{Data, Nick, Send}; +use euphoxide::bot::botrulez; +use euphoxide::bot::instance::{ConnSnapshot, Event, ServerConfig}; +use jiff::Timestamp; +use tokio::sync::mpsc; + +const NICK: &str = "TestBot"; +const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; + +async fn on_packet(packet: ParsedPacket, snapshot: ConnSnapshot) -> Result<(), ()> { + let data = match packet.content { + Ok(data) => data, + Err(err) => { + println!("Error for {}: {err}", packet.r#type); + return Err(()); + } + }; + + match data { + Data::HelloEvent(ev) => println!("Connected with id {}", ev.session.id), + Data::SnapshotEvent(ev) => { + for session in ev.listing { + println!("{:?} ({}) is already here", session.name, session.id); + } + + // Here, a new task is spawned so the main event loop can + // continue running immediately instead of waiting for a reply + // from the server. + // + // We only need to do this because we want to log the result of + // the nick command. Otherwise, we could've just called + // tx.send() synchronously and ignored the returned Future. + let conn_tx_clone = snapshot.conn_tx.clone(); + tokio::spawn(async move { + // Awaiting the future returned by the send command lets you + // (type-safely) access the server's reply. + let reply = conn_tx_clone + .send(Nick { + name: NICK.to_string(), + }) + .await; + match reply { + Ok(reply) => println!("Set nick to {:?}", reply.to), + Err(err) => println!("Failed to set nick: {err}"), + }; + }); + } + Data::BounceEvent(_) => { + println!("Received bounce event, stopping"); + return Err(()); + } + Data::DisconnectEvent(_) => { + println!("Received disconnect event, stopping"); + return Err(()); + } + Data::JoinEvent(event) => println!("{:?} ({}) joined", event.0.name, event.0.id), + Data::PartEvent(event) => println!("{:?} ({}) left", event.0.name, event.0.id), + Data::NickEvent(event) => println!( + "{:?} ({}) is now known as {:?}", + event.from, event.id, event.to + ), + Data::SendEvent(event) => { + println!("Message {} was just sent", event.0.id.0); + + let content = event.0.content.trim(); + let mut reply = None; + + if content == "!ping" || content == format!("!ping @{NICK}") { + reply = Some("Pong!".to_string()); + } else if content == format!("!help @{NICK}") { + reply = Some(HELP.to_string()); + } else if content == format!("!uptime @{NICK}") { + if let Some(joined) = snapshot.state.joined() { + let delta = Timestamp::now() - joined.since; + reply = Some(format!( + "/me has been up for {}", + botrulez::format_duration(delta) + )); + } + } else if content == "!test" { + reply = Some("Test successful!".to_string()); + } else if content == format!("!kill @{NICK}") { + println!( + "I was killed by {:?} ({})", + event.0.sender.name, event.0.sender.id + ); + // Awaiting the server reply in the main loop to ensure the + // message is sent before we exit the loop. Otherwise, there + // would be a race between sending the message and closing + // the connection as the send function can return before the + // message has actually been sent. + let _ = snapshot + .conn_tx + .send(Send { + content: "/me dies".to_string(), + parent: Some(event.0.id), + }) + .await; + return Err(()); + } + + if let Some(reply) = reply { + // If you are not interested in the result, you can just + // throw away the future returned by the send function. + println!("Sending reply..."); + snapshot.conn_tx.send_only(Send { + content: reply, + parent: Some(event.0.id), + }); + println!("Reply sent!"); + } + } + _ => {} + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + // https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455247837 + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let _instance = ServerConfig::default() + .room("test") + .username(Some("TestBot")) + .build(move |e| { + let _ = tx.send(e); + }); + + while let Some(event) = rx.recv().await { + if let Event::Packet(_config, packet, snapshot) = event { + if on_packet(packet, snapshot).await.is_err() { + break; + } + } + } +} diff --git a/examples/testbot_instances.rs b/examples/testbot_instances.rs new file mode 100644 index 0000000..0fb612f --- /dev/null +++ b/examples/testbot_instances.rs @@ -0,0 +1,159 @@ +//! Similar to the `testbot_manual` example, but using [`Instance`] to connect +//! to the room (and to reconnect). + +use euphoxide::api::packet::ParsedPacket; +use euphoxide::api::{Data, Nick, Send}; +use euphoxide::bot::botrulez; +use euphoxide::bot::instance::{ConnSnapshot, Event, ServerConfig}; +use euphoxide::bot::instances::Instances; +use jiff::Timestamp; +use tokio::sync::mpsc; + +const NICK: &str = "TestBot"; +const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; + +async fn on_packet(packet: ParsedPacket, snapshot: ConnSnapshot) -> Result<(), ()> { + let data = match packet.content { + Ok(data) => data, + Err(err) => { + println!("Error for {}: {err}", packet.r#type); + return Err(()); + } + }; + + match data { + Data::HelloEvent(ev) => println!("Connected with id {}", ev.session.id), + Data::SnapshotEvent(ev) => { + for session in ev.listing { + println!("{:?} ({}) is already here", session.name, session.id); + } + + // Here, a new task is spawned so the main event loop can + // continue running immediately instead of waiting for a reply + // from the server. + // + // We only need to do this because we want to log the result of + // the nick command. Otherwise, we could've just called + // tx.send() synchronously and ignored the returned Future. + let conn_tx_clone = snapshot.conn_tx.clone(); + tokio::spawn(async move { + // Awaiting the future returned by the send command lets you + // (type-safely) access the server's reply. + let reply = conn_tx_clone + .send(Nick { + name: NICK.to_string(), + }) + .await; + match reply { + Ok(reply) => println!("Set nick to {:?}", reply.to), + Err(err) => println!("Failed to set nick: {err}"), + }; + }); + } + Data::BounceEvent(_) => { + println!("Received bounce event, stopping"); + return Err(()); + } + Data::DisconnectEvent(_) => { + println!("Received disconnect event, stopping"); + return Err(()); + } + Data::JoinEvent(event) => println!("{:?} ({}) joined", event.0.name, event.0.id), + Data::PartEvent(event) => println!("{:?} ({}) left", event.0.name, event.0.id), + Data::NickEvent(event) => println!( + "{:?} ({}) is now known as {:?}", + event.from, event.id, event.to + ), + Data::SendEvent(event) => { + println!("Message {} was just sent", event.0.id.0); + + let content = event.0.content.trim(); + let mut reply = None; + + if content == "!ping" || content == format!("!ping @{NICK}") { + reply = Some("Pong!".to_string()); + } else if content == format!("!help @{NICK}") { + reply = Some(HELP.to_string()); + } else if content == format!("!uptime @{NICK}") { + if let Some(joined) = snapshot.state.joined() { + let delta = Timestamp::now() - joined.since; + reply = Some(format!( + "/me has been up for {}", + botrulez::format_duration(delta) + )); + } + } else if content == "!test" { + reply = Some("Test successful!".to_string()); + } else if content == format!("!kill @{NICK}") { + println!( + "I was killed by {:?} ({})", + event.0.sender.name, event.0.sender.id + ); + // Awaiting the server reply in the main loop to ensure the + // message is sent before we exit the loop. Otherwise, there + // would be a race between sending the message and closing + // the connection as the send function can return before the + // message has actually been sent. + let _ = snapshot + .conn_tx + .send(Send { + content: "/me dies".to_string(), + parent: Some(event.0.id), + }) + .await; + return Err(()); + } + + if let Some(reply) = reply { + // If you are not interested in the result, you can just + // throw away the future returned by the send function. + println!("Sending reply..."); + snapshot.conn_tx.send_only(Send { + content: reply, + parent: Some(event.0.id), + }); + println!("Reply sent!"); + } + } + _ => {} + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + // https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455247837 + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut instances = Instances::new(ServerConfig::default()); + + for room in ["test", "test2", "testing"] { + let tx_clone = tx.clone(); + let instance = instances + .server_config() + .clone() + .room(room) + .username(Some("TestBot")) + .build(move |e| { + let _ = tx_clone.send(e); + }); + instances.add(instance); + } + + while let Some(event) = rx.recv().await { + instances.purge(); + if instances.is_empty() { + break; + } + + if let Event::Packet(_config, packet, snapshot) = event { + if on_packet(packet, snapshot).await.is_err() { + break; + } + } + } +} diff --git a/examples/testbot_manual.rs b/examples/testbot_manual.rs new file mode 100644 index 0000000..da21db0 --- /dev/null +++ b/examples/testbot_manual.rs @@ -0,0 +1,142 @@ +//! A small bot that doesn't use the `bot` submodule. Meant to show how the main +//! parts of the API fit together. + +use std::error::Error; +use std::time::Duration; + +use euphoxide::api::packet::ParsedPacket; +use euphoxide::api::{Data, Nick, Send}; +use euphoxide::bot::botrulez; +use euphoxide::conn::{Conn, ConnTx, State}; +use jiff::Timestamp; + +const TIMEOUT: Duration = Duration::from_secs(10); +const DOMAIN: &str = "euphoria.leet.nu"; +const ROOM: &str = "test"; +const NICK: &str = "TestBot"; +const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; + +async fn on_packet(packet: ParsedPacket, conn_tx: &ConnTx, state: &State) -> Result<(), ()> { + let data = match packet.content { + Ok(data) => data, + Err(err) => { + println!("Error for {}: {err}", packet.r#type); + return Err(()); + } + }; + + match data { + Data::HelloEvent(event) => println!("Connected with id {}", event.session.id), + Data::SnapshotEvent(event) => { + for session in event.listing { + println!("{:?} ({}) is already here", session.name, session.id); + } + + // Here, a new task is spawned so the main event loop can + // continue running immediately instead of waiting for a reply + // from the server. + // + // We only need to do this because we want to log the result of + // the nick command. Otherwise, we could've just called + // tx.send() synchronously and ignored the returned Future. + let conn_tx_clone = conn_tx.clone(); + tokio::spawn(async move { + // Awaiting the future returned by the send command lets you + // (type-safely) access the server's reply. + let reply = conn_tx_clone + .send(Nick { + name: NICK.to_string(), + }) + .await; + match reply { + Ok(reply) => println!("Set nick to {:?}", reply.to), + Err(err) => println!("Failed to set nick: {err}"), + }; + }); + } + Data::BounceEvent(_) => { + println!("Received bounce event, stopping"); + return Err(()); + } + Data::DisconnectEvent(_) => { + println!("Received disconnect event, stopping"); + return Err(()); + } + Data::JoinEvent(event) => println!("{:?} ({}) joined", event.0.name, event.0.id), + Data::PartEvent(event) => println!("{:?} ({}) left", event.0.name, event.0.id), + Data::NickEvent(event) => println!( + "{:?} ({}) is now known as {:?}", + event.from, event.id, event.to + ), + Data::SendEvent(event) => { + println!("Message {} was just sent", event.0.id.0); + + let content = event.0.content.trim(); + let mut reply = None; + + if content == "!ping" || content == format!("!ping @{NICK}") { + reply = Some("Pong!".to_string()); + } else if content == format!("!help @{NICK}") { + reply = Some(HELP.to_string()); + } else if content == format!("!uptime @{NICK}") { + if let Some(joined) = state.joined() { + let delta = Timestamp::now() - joined.since; + reply = Some(format!( + "/me has been up for {}", + botrulez::format_duration(delta) + )); + } + } else if content == "!test" { + reply = Some("Test successful!".to_string()); + } else if content == format!("!kill @{NICK}") { + println!( + "I was killed by {:?} ({})", + event.0.sender.name, event.0.sender.id + ); + // Awaiting the server reply in the main loop to ensure the + // message is sent before we exit the loop. Otherwise, there + // would be a race between sending the message and closing + // the connection as the send function can return before the + // message has actually been sent. + let _ = conn_tx + .send(Send { + content: "/me dies".to_string(), + parent: Some(event.0.id), + }) + .await; + return Err(()); + } + + if let Some(reply) = reply { + // If you are not interested in the result, you can just + // throw away the future returned by the send function. + println!("Sending reply..."); + conn_tx.send_only(Send { + content: reply, + parent: Some(event.0.id), + }); + println!("Reply sent!"); + } + } + _ => {} + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // https://github.com/snapview/tokio-tungstenite/issues/353#issuecomment-2455247837 + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + let (mut conn, _) = Conn::connect(DOMAIN, ROOM, false, None, TIMEOUT).await?; + + while let Ok(packet) = conn.recv().await { + if on_packet(packet, conn.tx(), conn.state()).await.is_err() { + break; + } + } + Ok(()) +} diff --git a/src/api.rs b/src/api.rs index 5923f47..e24ca1d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,4 +1,6 @@ -//! Models the euphoria API at . +//! Models the [euphoria API][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api mod account_cmds; mod events; diff --git a/src/api/account_cmds.rs b/src/api/account_cmds.rs index 6a39db6..01ef7f0 100644 --- a/src/api/account_cmds.rs +++ b/src/api/account_cmds.rs @@ -56,7 +56,7 @@ pub struct ChangePassword { /// Return the outcome of changing the password. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ChangePasswordReply; +pub struct ChangePasswordReply {} /// Attempt to log an anonymous session into an account. /// @@ -99,11 +99,11 @@ pub struct LoginReply { /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged out session. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Logout; +pub struct Logout {} /// Confirm a logout. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LogoutReply; +pub struct LogoutReply {} /// Create a new account and logs into it. /// @@ -146,11 +146,11 @@ pub struct RegisterAccountReply { /// An error will be returned if the account has no unverified email addresses /// associated with it. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResendVerificationEmail; +pub struct ResendVerificationEmail {} /// Indicate that a verification email has been sent. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResendVerificationEmailReply; +pub struct ResendVerificationEmailReply {} /// Generate a password reset request. /// @@ -164,4 +164,4 @@ pub struct ResetPassword { /// Confirm that the password reset is in progress. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResetPasswordReply; +pub struct ResetPasswordReply {} diff --git a/src/api/events.rs b/src/api/events.rs index a80a49d..8abe04d 100644 --- a/src/api/events.rs +++ b/src/api/events.rs @@ -68,7 +68,7 @@ pub struct LoginEvent { /// Sent to all sessions of an agent when that agent is logged out (except for /// the session that issued the logout command). #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct LogoutEvent; +pub struct LogoutEvent {} /// Indicates some server-side event that impacts the presence of sessions in a /// room. diff --git a/src/api/room_cmds.rs b/src/api/room_cmds.rs index bb60e0a..0a2d553 100644 --- a/src/api/room_cmds.rs +++ b/src/api/room_cmds.rs @@ -110,7 +110,7 @@ pub struct SendReply(pub Message); /// Request a list of sessions currently joined in the room. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Who; +pub struct Who {} /// Lists the sessions currently joined in the room. #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/api/types.rs b/src/api/types.rs index c24b4c9..b1408a8 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -10,9 +10,9 @@ use std::num::ParseIntError; use std::str::FromStr; use std::{error, fmt}; +use jiff::Timestamp; use serde::{de, ser, Deserialize, Serialize}; use serde_json::Value; -use time::OffsetDateTime; /// Describes an account and its preferred name. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -243,7 +243,7 @@ pub enum PacketType { impl fmt::Display for PacketType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match serde_json::to_value(self) { - Ok(Value::String(s)) => write!(f, "{}", s), + Ok(Value::String(s)) => write!(f, "{s}"), _ => Err(fmt::Error), } } @@ -371,7 +371,7 @@ impl FromStr for Snowflake { impl Serialize for Snowflake { fn serialize(&self, serializer: S) -> Result { - format!("{}", self).serialize(serializer) + format!("{self}").serialize(serializer) } } @@ -403,11 +403,19 @@ impl<'de> Deserialize<'de> for Snowflake { /// Time is specified as a signed 64-bit integer, giving the number of seconds /// since the Unix Epoch. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct Time(#[serde(with = "time::serde::timestamp")] pub OffsetDateTime); +pub struct Time(pub i64); impl Time { + pub fn from_timestamp(time: Timestamp) -> Self { + Self(time.as_second()) + } + + pub fn as_timestamp(&self) -> Timestamp { + Timestamp::from_second(self.0).unwrap() + } + pub fn now() -> Self { - Self(OffsetDateTime::now_utc().replace_millisecond(0).unwrap()) + Self::from_timestamp(Timestamp::now()) } } diff --git a/src/bot.rs b/src/bot.rs new file mode 100644 index 0000000..5076211 --- /dev/null +++ b/src/bot.rs @@ -0,0 +1,7 @@ +//! Building blocks for bots. + +pub mod botrulez; +pub mod command; +pub mod commands; +pub mod instance; +pub mod instances; diff --git a/src/bot/botrulez.rs b/src/bot/botrulez.rs new file mode 100644 index 0000000..6dd5adb --- /dev/null +++ b/src/bot/botrulez.rs @@ -0,0 +1,10 @@ +//! The main [botrulez](https://github.com/jedevc/botrulez) commands. +pub mod full_help; +pub mod ping; +pub mod short_help; +pub mod uptime; + +pub use self::full_help::{FullHelp, HasDescriptions}; +pub use self::ping::Ping; +pub use self::short_help::ShortHelp; +pub use self::uptime::{format_duration, format_relative_time, format_time, HasStartTime, Uptime}; diff --git a/src/bot/botrulez/full_help.rs b/src/bot/botrulez/full_help.rs new file mode 100644 index 0000000..20ffcc4 --- /dev/null +++ b/src/bot/botrulez/full_help.rs @@ -0,0 +1,93 @@ +use async_trait::async_trait; +use clap::Parser; + +use crate::api::Message; +use crate::bot::command::{ClapCommand, Command, Context}; +use crate::conn; + +pub struct FullHelp { + pub before: String, + pub after: String, +} + +pub trait HasDescriptions { + fn descriptions(&self, ctx: &Context) -> Vec; +} + +impl FullHelp { + pub fn new(before: S1, after: S2) -> Self { + Self { + before: before.to_string(), + after: after.to_string(), + } + } + + fn formulate_reply(&self, ctx: &Context, bot: &B) -> String { + let mut result = String::new(); + + if !self.before.is_empty() { + result.push_str(&self.before); + result.push('\n'); + } + + for description in bot.descriptions(ctx) { + result.push_str(&description); + result.push('\n'); + } + + if !self.after.is_empty() { + result.push_str(&self.after); + result.push('\n'); + } + + result + } +} + +#[async_trait] +impl Command for FullHelp +where + B: HasDescriptions + Send, + E: From, +{ + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + if arg.trim().is_empty() { + let reply = self.formulate_reply(ctx, bot); + ctx.reply(msg.id, reply).await?; + Ok(true) + } else { + Ok(false) + } + } +} + +/// Show full bot help. +#[derive(Parser)] +pub struct Args {} + +#[async_trait] +impl ClapCommand for FullHelp +where + B: HasDescriptions + Send, + E: From, +{ + type Args = Args; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + let reply = self.formulate_reply(ctx, bot); + ctx.reply(msg.id, reply).await?; + Ok(true) + } +} diff --git a/src/bot/botrulez/ping.rs b/src/bot/botrulez/ping.rs new file mode 100644 index 0000000..c7cea39 --- /dev/null +++ b/src/bot/botrulez/ping.rs @@ -0,0 +1,64 @@ +use async_trait::async_trait; +use clap::Parser; + +use crate::api::Message; +use crate::bot::command::{ClapCommand, Command, Context}; +use crate::conn; + +pub struct Ping(pub String); + +impl Ping { + pub fn new(reply: S) -> Self { + Self(reply.to_string()) + } +} + +impl Default for Ping { + fn default() -> Self { + Self::new("Pong!") + } +} + +#[async_trait] +impl Command for Ping +where + E: From, +{ + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + _bot: &mut B, + ) -> Result { + if arg.trim().is_empty() { + ctx.reply(msg.id, &self.0).await?; + Ok(true) + } else { + Ok(false) + } + } +} + +/// Trigger a short reply. +#[derive(Parser)] +pub struct Args {} + +#[async_trait] +impl ClapCommand for Ping +where + E: From, +{ + type Args = Args; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + _bot: &mut B, + ) -> Result { + ctx.reply(msg.id, &self.0).await?; + Ok(true) + } +} diff --git a/src/bot/botrulez/short_help.rs b/src/bot/botrulez/short_help.rs new file mode 100644 index 0000000..1a359be --- /dev/null +++ b/src/bot/botrulez/short_help.rs @@ -0,0 +1,58 @@ +use async_trait::async_trait; +use clap::Parser; + +use crate::api::Message; +use crate::bot::command::{ClapCommand, Command, Context}; +use crate::conn; + +pub struct ShortHelp(pub String); + +impl ShortHelp { + pub fn new(text: S) -> Self { + Self(text.to_string()) + } +} + +#[async_trait] +impl Command for ShortHelp +where + E: From, +{ + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + _bot: &mut B, + ) -> Result { + if arg.trim().is_empty() { + ctx.reply(msg.id, &self.0).await?; + Ok(true) + } else { + Ok(false) + } + } +} + +/// Show short bot help. +#[derive(Parser)] +pub struct Args {} + +#[async_trait] +impl ClapCommand for ShortHelp +where + E: From, +{ + type Args = Args; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + _bot: &mut B, + ) -> Result { + ctx.reply(msg.id, &self.0).await?; + Ok(true) + } +} diff --git a/src/bot/botrulez/uptime.rs b/src/bot/botrulez/uptime.rs new file mode 100644 index 0000000..d8b1d0d --- /dev/null +++ b/src/bot/botrulez/uptime.rs @@ -0,0 +1,133 @@ +use async_trait::async_trait; +use clap::Parser; +use jiff::{Span, Timestamp, Unit}; + +use crate::api::Message; +use crate::bot::command::{ClapCommand, Command, Context}; +use crate::conn; + +pub fn format_time(t: Timestamp) -> String { + t.strftime("%Y-%m-%d %H:%M:%S UTC").to_string() +} + +pub fn format_relative_time(d: Span) -> String { + if d.is_positive() { + format!("in {}", format_duration(d.abs())) + } else { + format!("{} ago", format_duration(d.abs())) + } +} + +pub fn format_duration(d: Span) -> String { + let total = d.abs().total(Unit::Second).unwrap() as i64; + let secs = total % 60; + let mins = (total / 60) % 60; + let hours = (total / 60 / 60) % 24; + let days = total / 60 / 60 / 24; + + let mut segments = vec![]; + if days > 0 { + segments.push(format!("{days}d")); + } + if hours > 0 { + segments.push(format!("{hours}h")); + } + if mins > 0 { + segments.push(format!("{mins}m")); + } + if secs > 0 { + segments.push(format!("{secs}s")); + } + if segments.is_empty() { + segments.push("0s".to_string()); + } + + let segments = segments.join(" "); + if d.is_positive() { + segments + } else { + format!("-{segments}") + } +} + +pub struct Uptime; + +pub trait HasStartTime { + fn start_time(&self) -> Timestamp; +} + +impl Uptime { + fn formulate_reply(&self, ctx: &Context, bot: &B, connected: bool) -> String { + let start = bot.start_time(); + let now = Timestamp::now(); + + let mut reply = format!( + "/me has been up since {} ({})", + format_time(start), + format_relative_time(start - now), + ); + + if connected { + let since = ctx.joined.since; + reply.push_str(&format!( + ", connected since {} ({})", + format_time(since), + format_relative_time(since - now), + )); + } + + reply + } +} + +#[async_trait] +impl Command for Uptime +where + B: HasStartTime + Send, + E: From, +{ + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + if arg.trim().is_empty() { + let reply = self.formulate_reply(ctx, bot, false); + ctx.reply(msg.id, reply).await?; + Ok(true) + } else { + Ok(false) + } + } +} + +/// Show how long the bot has been online. +#[derive(Parser)] +pub struct Args { + /// Show how long the bot has been connected without interruption. + #[arg(long, short)] + pub connected: bool, +} + +#[async_trait] +impl ClapCommand for Uptime +where + B: HasStartTime + Send, + E: From, +{ + type Args = Args; + + async fn execute( + &self, + args: Self::Args, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + let reply = self.formulate_reply(ctx, bot, args.connected); + ctx.reply(msg.id, reply).await?; + Ok(true) + } +} diff --git a/src/bot/command.rs b/src/bot/command.rs new file mode 100644 index 0000000..226adb0 --- /dev/null +++ b/src/bot/command.rs @@ -0,0 +1,64 @@ +mod bang; +mod clap; +mod hidden; +mod prefixed; + +use std::future::Future; + +use async_trait::async_trait; + +use crate::api::{self, Message, MessageId}; +use crate::conn::{self, ConnTx, Joined}; + +pub use self::bang::*; +pub use self::clap::*; +pub use self::hidden::*; +pub use self::prefixed::*; + +use super::instance::InstanceConfig; + +pub struct Context { + pub config: InstanceConfig, + pub conn_tx: ConnTx, + pub joined: Joined, +} + +impl Context { + pub fn send(&self, content: S) -> impl Future> { + let cmd = api::Send { + content: content.to_string(), + parent: None, + }; + let reply = self.conn_tx.send(cmd); + async move { reply.await.map(|r| r.0) } + } + + pub fn reply( + &self, + parent: MessageId, + content: S, + ) -> impl Future> { + let cmd = api::Send { + content: content.to_string(), + parent: Some(parent), + }; + let reply = self.conn_tx.send(cmd); + async move { reply.await.map(|r| r.0) } + } +} + +#[allow(unused_variables)] +#[async_trait] +pub trait Command { + fn description(&self, ctx: &Context) -> Option { + None + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result; +} diff --git a/src/bot/command/bang.rs b/src/bot/command/bang.rs new file mode 100644 index 0000000..a55d99d --- /dev/null +++ b/src/bot/command/bang.rs @@ -0,0 +1,235 @@ +use async_trait::async_trait; + +use crate::api::Message; +use crate::nick; + +use super::{Command, Context}; + +// TODO Don't ignore leading whitespace? +// I'm not entirely happy with how commands handle whitespace, and on euphoria, +// prefixing commands with whitespace is traditionally used to not trigger them. + +/// Parse leading whitespace followed by an prefix-initiated command. +/// +/// Returns the command name and the remaining text with one leading whitespace +/// removed. The remaining text may be the empty string. +pub fn parse_prefix_initiated<'a>(text: &'a str, prefix: &str) -> Option<(&'a str, &'a str)> { + let text = text.trim_start(); + let text = text.strip_prefix(prefix)?; + let (name, rest) = text.split_once(char::is_whitespace).unwrap_or((text, "")); + if name.is_empty() { + return None; + } + Some((name, rest)) +} + +pub struct Global { + prefix: String, + name: String, + inner: C, +} + +impl Global { + pub fn new(name: S, inner: C) -> Self { + Self { + prefix: "!".to_string(), + name: name.to_string(), + inner, + } + } + + pub fn prefix(mut self, prefix: S) -> Self { + self.prefix = prefix.to_string(); + self + } +} + +#[async_trait] +impl Command for Global +where + B: Send, + C: Command + Send + Sync, +{ + fn description(&self, ctx: &Context) -> Option { + let inner = self.inner.description(ctx)?; + Some(format!("{}{} - {inner}", self.prefix, self.name)) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + // TODO Replace with let-else + let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { + Some(parsed) => parsed, + None => return Ok(false), + }; + + if name != self.name { + return Ok(false); + } + + self.inner.execute(rest, msg, ctx, bot).await + } +} + +pub struct General { + prefix: String, + name: String, + inner: C, +} + +impl General { + pub fn new(name: S, inner: C) -> Self { + Self { + prefix: "!".to_string(), + name: name.to_string(), + inner, + } + } + + pub fn prefix(mut self, prefix: S) -> Self { + self.prefix = prefix.to_string(); + self + } +} + +#[async_trait] +impl Command for General +where + B: Send, + C: Command + Send + Sync, +{ + fn description(&self, ctx: &Context) -> Option { + let inner = self.inner.description(ctx)?; + Some(format!("{}{} - {inner}", self.prefix, self.name)) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + // TODO Replace with let-else + let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { + Some(parsed) => parsed, + None => return Ok(false), + }; + + if name != self.name { + return Ok(false); + } + + if parse_prefix_initiated(rest, "@").is_some() { + // The command looks like a specific command. If we treated it like + // a general command match, we would interpret other bots' specific + // commands as general commands. + return Ok(false); + } + + self.inner.execute(rest, msg, ctx, bot).await + } +} + +pub struct Specific { + prefix: String, + name: String, + inner: C, +} + +impl Specific { + pub fn new(name: S, inner: C) -> Self { + Self { + prefix: "!".to_string(), + name: name.to_string(), + inner, + } + } + + pub fn prefix(mut self, prefix: S) -> Self { + self.prefix = prefix.to_string(); + self + } +} + +#[async_trait] +impl Command for Specific +where + B: Send, + C: Command + Send + Sync, +{ + fn description(&self, ctx: &Context) -> Option { + let inner = self.inner.description(ctx)?; + let nick = nick::mention(&ctx.joined.session.name); + Some(format!("{}{} @{nick} - {inner}", self.prefix, self.name)) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + // TODO Replace with let-else + let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { + Some(parsed) => parsed, + None => return Ok(false), + }; + + if name != self.name { + return Ok(false); + } + + // TODO Replace with let-else + let (nick, rest) = match parse_prefix_initiated(rest, "@") { + Some(parsed) => parsed, + None => return Ok(false), + }; + + if nick::normalize(nick) != nick::normalize(&ctx.joined.session.name) { + return Ok(false); + } + + self.inner.execute(rest, msg, ctx, bot).await + } +} + +#[cfg(test)] +mod test { + use super::parse_prefix_initiated; + + #[test] + fn test_parse_prefixed() { + assert_eq!(parse_prefix_initiated("!foo", "!"), Some(("foo", ""))); + assert_eq!(parse_prefix_initiated(" !foo", "!"), Some(("foo", ""))); + assert_eq!( + parse_prefix_initiated("!foo ", "!"), + Some(("foo", " ")) + ); + assert_eq!( + parse_prefix_initiated(" !foo ", "!"), + Some(("foo", " ")) + ); + assert_eq!( + parse_prefix_initiated("!foo @bar", "!"), + Some(("foo", "@bar")) + ); + assert_eq!( + parse_prefix_initiated("!foo @bar", "!"), + Some(("foo", " @bar")) + ); + assert_eq!( + parse_prefix_initiated("!foo @bar ", "!"), + Some(("foo", "@bar ")) + ); + assert_eq!(parse_prefix_initiated("! foo @bar", "!"), None); + assert_eq!(parse_prefix_initiated("!", "!"), None); + assert_eq!(parse_prefix_initiated("?foo", "!"), None); + } +} diff --git a/src/bot/command/clap.rs b/src/bot/command/clap.rs new file mode 100644 index 0000000..a22b49a --- /dev/null +++ b/src/bot/command/clap.rs @@ -0,0 +1,182 @@ +use async_trait::async_trait; +use clap::{CommandFactory, Parser}; + +use crate::api::Message; +use crate::conn; + +use super::{Command, Context}; + +#[async_trait] +pub trait ClapCommand { + type Args; + + async fn execute( + &self, + args: Self::Args, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result; +} + +/// Parse bash-like quoted arguments separated by whitespace. +/// +/// Outside of quotes, the backslash either escapes the next character or forms +/// an escape sequence. \n is a newline, \r a carriage return and \t a tab. +/// TODO Escape sequences +/// +/// Special characters like the backslash and whitespace can also be quoted +/// using double quotes. Within double quotes, \" escapes a double quote and \\ +/// escapes a backslash. Other occurrences of \ have no special meaning. +fn parse_quoted_args(text: &str) -> Result, &'static str> { + let mut args = vec![]; + let mut arg = String::new(); + let mut arg_exists = false; + + let mut quoted = false; + let mut escaped = false; + for c in text.chars() { + if quoted { + match c { + '\\' if escaped => { + arg.push('\\'); + escaped = false; + } + '"' if escaped => { + arg.push('"'); + escaped = false; + } + c if escaped => { + arg.push('\\'); + arg.push(c); + escaped = false; + } + '\\' => escaped = true, + '"' => quoted = false, + c => arg.push(c), + } + } else { + match c { + c if escaped => { + arg.push(c); + arg_exists = true; + escaped = false; + } + c if c.is_whitespace() => { + if arg_exists { + args.push(arg); + arg = String::new(); + arg_exists = false; + } + } + '\\' => escaped = true, + '"' => { + quoted = true; + arg_exists = true; + } + c => { + arg.push(c); + arg_exists = true; + } + } + } + } + + if quoted { + return Err("Unclosed trailing quote"); + } + if escaped { + return Err("Unfinished trailing escape"); + } + + if arg_exists { + args.push(arg); + } + + Ok(args) +} + +pub struct Clap(pub C); + +#[async_trait] +impl Command for Clap +where + B: Send, + E: From, + C: ClapCommand + Send + Sync, + C::Args: Parser + Send, +{ + fn description(&self, _ctx: &Context) -> Option { + C::Args::command().get_about().map(|s| format!("{s}")) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + let mut args = match parse_quoted_args(arg) { + Ok(args) => args, + Err(err) => { + ctx.reply(msg.id, err).await?; + return Ok(true); + } + }; + + // Hacky, but it should work fine in most cases + let usage = msg.content.strip_suffix(arg).unwrap_or("").trim(); + args.insert(0, usage.to_string()); + + let args = match C::Args::try_parse_from(args) { + Ok(args) => args, + Err(err) => { + ctx.reply(msg.id, format!("{}", err.render())).await?; + return Ok(true); + } + }; + + self.0.execute(args, msg, ctx, bot).await + } +} + +#[cfg(test)] +mod test { + use super::parse_quoted_args; + + fn assert_quoted(raw: &str, parsed: &[&str]) { + let parsed = parsed.iter().map(|s| s.to_string()).collect(); + assert_eq!(parse_quoted_args(raw), Ok(parsed)) + } + + #[test] + fn test_parse_quoted_args() { + assert_quoted("foo bar baz", &["foo", "bar", "baz"]); + assert_quoted(" foo bar baz ", &["foo", "bar", "baz"]); + assert_quoted("foo\\ ba\"r ba\"z", &["foo bar baz"]); + assert_quoted( + "It's a nice day, isn't it?", + &["It's", "a", "nice", "day,", "isn't", "it?"], + ); + + // Trailing whitespace + assert_quoted("a ", &["a"]); + assert_quoted("a\\ ", &["a "]); + assert_quoted("a\\ ", &["a "]); + + // Zero-length arguments + assert_quoted("a \"\" b \"\"", &["a", "", "b", ""]); + assert_quoted("a \"\" b \"\" ", &["a", "", "b", ""]); + + // Backslashes in quotes + assert_quoted("\"a \\b \\\" \\\\\"", &["a \\b \" \\"]); + + // Unclosed quotes and unfinished escapes + assert!(parse_quoted_args("foo 'bar \"baz").is_err()); + assert!(parse_quoted_args("foo \"bar baz").is_err()); + assert!(parse_quoted_args("foo \"bar 'baz").is_err()); + assert!(parse_quoted_args("foo \\").is_err()); + assert!(parse_quoted_args("foo 'bar\\").is_err()); + } +} diff --git a/src/bot/command/hidden.rs b/src/bot/command/hidden.rs new file mode 100644 index 0000000..0aad60d --- /dev/null +++ b/src/bot/command/hidden.rs @@ -0,0 +1,29 @@ +use async_trait::async_trait; + +use crate::api::Message; + +use super::{Command, Context}; + +pub struct Hidden(pub C); + +#[async_trait] +impl Command for Hidden +where + B: Send, + C: Command + Send + Sync, +{ + fn description(&self, _ctx: &Context) -> Option { + // Default implementation, repeated here for emphasis. + None + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + self.0.execute(arg, msg, ctx, bot).await + } +} diff --git a/src/bot/command/prefixed.rs b/src/bot/command/prefixed.rs new file mode 100644 index 0000000..f572732 --- /dev/null +++ b/src/bot/command/prefixed.rs @@ -0,0 +1,45 @@ +use async_trait::async_trait; + +use crate::api::Message; + +use super::{Command, Context}; + +pub struct Prefixed { + prefix: String, + inner: C, +} + +impl Prefixed { + pub fn new(prefix: S, inner: C) -> Self { + Self { + prefix: prefix.to_string(), + inner, + } + } +} + +#[async_trait] +impl Command for Prefixed +where + B: Send, + C: Command + Send + Sync, +{ + fn description(&self, ctx: &Context) -> Option { + let inner = self.inner.description(ctx)?; + Some(format!("{} - {inner}", self.prefix)) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &mut B, + ) -> Result { + if let Some(rest) = arg.trim_start().strip_prefix(&self.prefix) { + self.inner.execute(rest, msg, ctx, bot).await + } else { + Ok(false) + } + } +} diff --git a/src/bot/commands.rs b/src/bot/commands.rs new file mode 100644 index 0000000..a37cb5d --- /dev/null +++ b/src/bot/commands.rs @@ -0,0 +1,93 @@ +use crate::api::packet::ParsedPacket; +use crate::api::{Data, SendEvent}; +use crate::conn; + +use super::command::{Command, Context}; +use super::instance::{ConnSnapshot, InstanceConfig}; + +pub struct Commands { + commands: Vec + Send + Sync>>, + fallthrough: bool, +} + +impl Commands { + pub fn new() -> Self { + Self { + commands: vec![], + fallthrough: false, + } + } + + /// Whether further commands should be executed after a command returns + /// `true`. + /// + /// If disabled, commands are run until the first command that returns + /// `true`. If enabled, all commands are run irrespective of their return + /// values. + pub fn fallthrough(&self) -> bool { + self.fallthrough + } + + /// Set whether fallthrough is active. + /// + /// See [`Self::fallthrough`] for more details. + pub fn set_fallthrough(&mut self, active: bool) { + self.fallthrough = active; + } + + pub fn add(&mut self, command: C) + where + C: Command + Send + Sync + 'static, + { + self.commands.push(Box::new(command)); + } + + pub fn descriptions(&self, ctx: &Context) -> Vec { + self.commands + .iter() + .filter_map(|c| c.description(ctx)) + .collect::>() + } + + /// Returns `true` if one or more commands returned `true`, `false` + /// otherwise. + pub async fn handle_packet( + &self, + config: &InstanceConfig, + packet: &ParsedPacket, + snapshot: &ConnSnapshot, + bot: &mut B, + ) -> Result { + let msg = match &packet.content { + Ok(Data::SendEvent(SendEvent(msg))) => msg, + _ => return Ok(false), + }; + + let joined = match &snapshot.state { + conn::State::Joining(_) => return Ok(false), + conn::State::Joined(joined) => joined.clone(), + }; + + let ctx = Context { + config: config.clone(), + conn_tx: snapshot.conn_tx.clone(), + joined, + }; + + let mut handled = false; + for command in &self.commands { + handled = handled || command.execute(&msg.content, msg, &ctx, bot).await?; + if !self.fallthrough && handled { + break; + } + } + + Ok(handled) + } +} + +impl Default for Commands { + fn default() -> Self { + Self::new() + } +} diff --git a/src/bot/instance.rs b/src/bot/instance.rs new file mode 100644 index 0000000..0d00d4d --- /dev/null +++ b/src/bot/instance.rs @@ -0,0 +1,534 @@ +//! A single instance of a bot in a single room. +//! +//! See [`Instance`] for more details. + +use std::convert::Infallible; +use std::fmt; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use cookie::{Cookie, CookieJar}; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; +use tokio_tungstenite::tungstenite; +use tokio_tungstenite::tungstenite::http::{HeaderValue, StatusCode}; + +use crate::api::packet::ParsedPacket; +use crate::api::{Auth, AuthOption, Data, Nick}; +use crate::conn::{self, Conn, ConnTx, State}; + +macro_rules! ilog { + ( $conf:expr, $target:expr, $($arg:tt)+ ) => { + ::log::log!( + target: &format!("euphoxide::live::{}", $conf.name), + $target, + $($arg)+ + ); + }; +} + +macro_rules! idebug { + ( $conf:expr, $($arg:tt)+ ) => { + ilog!($conf, ::log::Level::Debug, $($arg)+); + }; +} + +macro_rules! iinfo { + ( $conf:expr, $($arg:tt)+ ) => { + ilog!($conf, ::log::Level::Info, $($arg)+); + }; +} + +macro_rules! iwarn { + ( $conf:expr, $($arg:tt)+ ) => { + ilog!($conf, ::log::Level::Warn, $($arg)+); + }; +} + +/// Settings that are usually shared between all instances connecting to a +/// specific server. +#[derive(Clone)] +pub struct ServerConfig { + /// How long to wait for the server until an operation is considered timed + /// out. + /// + /// This timeout applies to waiting for reply packets to command packets + /// sent by the client, as well as operations like connecting or closing a + /// connection. + pub timeout: Duration, + /// How long to wait until reconnecting after an unsuccessful attempt to + /// connect. + pub reconnect_delay: Duration, + /// Domain name, to be used with [`Conn::connect`]. + pub domain: String, + /// Cookies to use when connecting. They are updated with the server's reply + /// after successful connection attempts. + pub cookies: Arc>, +} + +impl ServerConfig { + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + pub fn reconnect_delay(mut self, reconnect_delay: Duration) -> Self { + self.reconnect_delay = reconnect_delay; + self + } + + pub fn domain(mut self, domain: S) -> Self { + self.domain = domain.to_string(); + self + } + + pub fn cookies(mut self, cookies: Arc>) -> Self { + self.cookies = cookies; + self + } + + pub fn room(self, room: S) -> InstanceConfig { + InstanceConfig::new(self, room) + } +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + timeout: Duration::from_secs(30), + reconnect_delay: Duration::from_secs(30), + domain: "euphoria.leet.nu".to_string(), + cookies: Arc::new(Mutex::new(CookieJar::new())), + } + } +} + +struct Hidden; + +impl fmt::Debug for Hidden { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "") + } +} + +impl fmt::Debug for ServerConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServerConfig") + .field("timeout", &self.timeout) + .field("reconnect_delay", &self.reconnect_delay) + .field("domain", &self.domain) + .field("cookies", &Hidden) + .finish() + } +} + +/// Settings that are usually specific to a single instance. +#[derive(Debug, Clone)] +pub struct InstanceConfig { + pub server: ServerConfig, + /// Unique name of this instance. + pub name: String, + /// Room name, to be used with [`Conn::connect`]. + pub room: String, + /// Whether the instance should connect as human or bot. + pub human: bool, + /// Username to set upon connecting. + pub username: Option, + /// Whether to set the username even if the server reports that the session + /// already has a username set. + pub force_username: bool, + /// Password to use if room requires authentication. + pub password: Option, +} + +impl InstanceConfig { + pub fn new(server: ServerConfig, room: S) -> Self { + Self { + server, + name: room.to_string(), + room: room.to_string(), + human: false, + username: None, + force_username: false, + password: None, + } + } + + pub fn name(mut self, name: S) -> Self { + self.name = name.to_string(); + self + } + + pub fn human(mut self, human: bool) -> Self { + self.human = human; + self + } + + pub fn username(mut self, username: Option) -> Self { + self.username = username.map(|s| s.to_string()); + self + } + + pub fn force_username(mut self, force_username: bool) -> Self { + self.force_username = force_username; + self + } + + pub fn password(mut self, password: Option) -> Self { + self.password = password.map(|s| s.to_string()); + self + } + + /// Create a new instance using this config. + /// + /// See [`Instance::new`] for more details. + pub fn build(self, on_event: F) -> Instance + where + F: Fn(Event) + Send + Sync + 'static, + { + Instance::new(self, on_event) + } +} + +/// Snapshot of a [`Conn`]'s state immediately after receiving a packet. +#[derive(Debug, Clone)] +pub struct ConnSnapshot { + pub conn_tx: ConnTx, + pub state: State, +} + +impl ConnSnapshot { + fn from_conn(conn: &Conn) -> Self { + Self { + conn_tx: conn.tx().clone(), + state: conn.state().clone(), + } + } +} + +// Most of the time, the largest variant (`Packet`) is sent. The size of this +// enum is not critical anyways since it's not constructed that often. +#[allow(clippy::large_enum_variant)] +/// An event emitted by an [`Instance`]. +/// +/// Events are emitted by a single instance following this schema, written in +/// pseudo-regex syntax: +/// ```text +/// (Connecting (Connected Packet*)? Disconnected)* Stopped +/// ``` +/// +/// In particular, this means that every [`Self::Connecting`] is always followed +/// by exactly one [`Self::Disconnected`], and that [`Self::Stopped`] is always +/// the last event and is always sent exactly once per instance. +#[derive(Debug)] +pub enum Event { + Connecting(InstanceConfig), + Connected(InstanceConfig, ConnSnapshot), + Packet(InstanceConfig, ParsedPacket, ConnSnapshot), + Disconnected(InstanceConfig), + Stopped(InstanceConfig), +} + +impl Event { + pub fn config(&self) -> &InstanceConfig { + match self { + Self::Connecting(config) => config, + Self::Connected(config, _) => config, + Self::Packet(config, _, _) => config, + Self::Disconnected(config) => config, + Self::Stopped(config) => config, + } + } +} + +enum Request { + GetConnTx(oneshot::Sender), + Stop, +} + +/// An error that occurred inside an [`Instance`] while it was running. +enum RunError { + StoppedManually, + InstanceDropped, + CouldNotConnect(conn::Error), + Conn(conn::Error), +} + +/// A single instance of a bot in a single room. +/// +/// The instance automatically connects to its room once it is created, and it +/// reconnects when it loses connection. If the room requires authentication and +/// a password is given, the instance automatically authenticates. If a nick is +/// given, the instance sets its nick upon joining the room. +/// +/// An instance has a unique name used for logging and identifying the instance. +/// The room name can be used as the instance name if there is never more than +/// one instance per room. +/// +/// An instance can be created using [`Instance::new`] or using +/// [`InstanceConfig::build`]. +/// +/// An instance can be stopped using [`Instance::stop`] or by dropping it. In +/// either case, the last event the instance sends will be an +/// [`Event::Stopped`]. If it is not stopped using one of these two ways, it +/// will continue to run and reconnect indefinitely. +#[derive(Debug, Clone)] +pub struct Instance { + config: InstanceConfig, + request_tx: mpsc::UnboundedSender, + // In theory, request_tx should be sufficient as canary, but I'm not sure + // exactly how to check it during the reconnect timeout. + _canary_tx: mpsc::UnboundedSender, +} + +impl Instance { + // Previously, the event callback was asynchronous and would return a result. It + // was called in-line to calling Conn::recv. The idea was that the instance + // would stop if the event handler returned Err. This was, however, not even + // implemented correctly and the instance would just reconnect. + // + // The new event handler is synchronous. This way, it becomes harder to + // accidentally block Conn::recv, for example by waiting for a channel with + // limited capacity. If async code must be executed upon receiving a command, + // the user can start a task from inside the handler. + // + // The new event handler does not return anything. This makes the code nicer. In + // the use cases I'm thinking of, it should not be a problem: If the event + // handler encounters errors, there's usually other ways to tell the same. Make + // the event handler ignore the errors and stop the instance in that other way. + + /// Create a new instance based on an [`InstanceConfig`]. + /// + /// The `on_event` parameter is called whenever the instance wants to emit + /// an [`Event`]. It must not block for long. See [`Event`] for more details + /// on the events and the order in which they are emitted. + /// + /// [`InstanceConfig::build`] can be used in place of this function. + pub fn new(config: InstanceConfig, on_event: F) -> Self + where + F: Fn(Event) + Send + Sync + 'static, + { + idebug!(config, "Created with config {config:?}"); + + let (request_tx, request_rx) = mpsc::unbounded_channel(); + let (canary_tx, canary_rx) = mpsc::unbounded_channel(); + + tokio::spawn(Self::run::( + config.clone(), + on_event, + request_rx, + canary_rx, + )); + + Self { + config, + request_tx, + _canary_tx: canary_tx, + } + } + + pub fn config(&self) -> &InstanceConfig { + &self.config + } + + /// Retrieve the instance's current connection. + /// + /// Returns `None` if the instance is currently not connected, or has + /// stopped running. + pub async fn conn_tx(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let _ = self.request_tx.send(Request::GetConnTx(tx)); + rx.await.ok() + } + + /// Stop the instance. + /// + /// For more info on stopping instances, see [`Instance`]. + pub fn stop(&self) { + let _ = self.request_tx.send(Request::Stop); + } + + /// Whether this instance is stopped. + /// + /// For more info on stopping instances, see [`Instance`]. + pub fn stopped(&self) -> bool { + self.request_tx.is_closed() + } + + async fn run( + config: InstanceConfig, + on_event: F, + request_rx: mpsc::UnboundedReceiver, + mut canary_rx: mpsc::UnboundedReceiver, + ) { + select! { + _ = Self::stay_connected(&config, &on_event, request_rx) => (), + _ = canary_rx.recv() => { idebug!(config, "Instance dropped"); }, + } + on_event(Event::Stopped(config)) + } + + async fn stay_connected( + config: &InstanceConfig, + on_event: &F, + mut request_rx: mpsc::UnboundedReceiver, + ) { + loop { + idebug!(config, "Connecting..."); + + on_event(Event::Connecting(config.clone())); + let result = Self::run_once::(config, on_event, &mut request_rx).await; + on_event(Event::Disconnected(config.clone())); + + let connected = match result { + Ok(()) => { + idebug!(config, "Connection closed normally"); + true + } + Err(RunError::StoppedManually) => { + idebug!(config, "Instance stopped manually"); + break; + } + Err(RunError::InstanceDropped) => { + idebug!(config, "Instance dropped"); + break; + } + Err(RunError::CouldNotConnect(conn::Error::Tungstenite( + tungstenite::Error::Http(response), + ))) if response.status() == StatusCode::NOT_FOUND => { + iwarn!(config, "Failed to connect: room does not exist"); + break; + } + Err(RunError::CouldNotConnect(err)) => { + iwarn!(config, "Failed to connect: {err}"); + false + } + Err(RunError::Conn(err)) => { + iwarn!(config, "An error occurred: {err}"); + true + } + }; + + if !connected { + let s = config.server.reconnect_delay.as_secs(); + idebug!(config, "Waiting {s} seconds before reconnecting"); + tokio::time::sleep(config.server.reconnect_delay).await; + } + } + } + + fn get_cookies(config: &InstanceConfig) -> HeaderValue { + let guard = config.server.cookies.lock().unwrap(); + let cookies = guard + .iter() + .map(|c| format!("{}", c.stripped())) + .collect::>() + .join("; "); + drop(guard); + cookies.try_into().unwrap() + } + + fn set_cookies(config: &InstanceConfig, cookies: Vec) { + idebug!(config, "Updating cookies"); + let mut guard = config.server.cookies.lock().unwrap(); + + for cookie in cookies { + if let Ok(cookie) = cookie.to_str() { + if let Ok(cookie) = Cookie::from_str(cookie) { + guard.add(cookie); + } + } + } + } + + async fn run_once( + config: &InstanceConfig, + on_event: &F, + request_rx: &mut mpsc::UnboundedReceiver, + ) -> Result<(), RunError> { + let (mut conn, cookies) = Conn::connect( + &config.server.domain, + &config.room, + config.human, + Some(Self::get_cookies(config)), + config.server.timeout, + ) + .await + .map_err(RunError::CouldNotConnect)?; + + Self::set_cookies(config, cookies); + on_event(Event::Connected( + config.clone(), + ConnSnapshot::from_conn(&conn), + )); + + let conn_tx = conn.tx().clone(); + select! { + r = Self::receive::(config, &mut conn, on_event) => r, + r = Self::handle_requests(request_rx, &conn_tx) => Err(r), + } + } + + async fn receive( + config: &InstanceConfig, + conn: &mut Conn, + on_event: &F, + ) -> Result<(), RunError> { + loop { + let packet = conn.recv().await.map_err(RunError::Conn)?; + let snapshot = ConnSnapshot::from_conn(conn); + + match &packet.content { + Ok(Data::SnapshotEvent(snapshot)) => { + if let Some(username) = &config.username { + if config.force_username || snapshot.nick.is_none() { + idebug!(config, "Setting nick to username {username}"); + let name = username.to_string(); + conn.tx().send_only(Nick { name }); + } else if let Some(nick) = &snapshot.nick { + idebug!(config, "Not setting nick, already set to {nick}"); + } + } + } + Ok(Data::BounceEvent(_)) => { + if let Some(password) = &config.password { + idebug!(config, "Authenticating with password"); + let cmd = Auth { + r#type: AuthOption::Passcode, + passcode: Some(password.to_string()), + }; + conn.tx().send_only(cmd); + } else { + iwarn!(config, "Auth required but no password configured"); + } + } + Ok(Data::DisconnectEvent(ev)) => { + if ev.reason == "authentication changed" { + iinfo!(config, "Disconnected because {}", ev.reason); + } else { + iwarn!(config, "Disconnected because {}", ev.reason); + } + } + _ => {} + } + + on_event(Event::Packet(config.clone(), packet, snapshot)); + } + } + + async fn handle_requests( + request_rx: &mut mpsc::UnboundedReceiver, + conn_tx: &ConnTx, + ) -> RunError { + while let Some(request) = request_rx.recv().await { + match request { + Request::GetConnTx(tx) => { + let _ = tx.send(conn_tx.clone()); + } + Request::Stop => return RunError::StoppedManually, + } + } + RunError::InstanceDropped + } +} diff --git a/src/bot/instances.rs b/src/bot/instances.rs new file mode 100644 index 0000000..6607a70 --- /dev/null +++ b/src/bot/instances.rs @@ -0,0 +1,70 @@ +//! A convenient way to keep a [`ServerConfig`] and some [`Instance`]s. + +use std::collections::HashMap; + +use super::instance::{self, Instance, ServerConfig}; + +/// A convenient way to keep a [`ServerConfig`] and some [`Instance`]s. +pub struct Instances { + server_config: ServerConfig, + instances: HashMap, +} + +impl Instances { + pub fn new(server_config: ServerConfig) -> Self { + Self { + server_config, + instances: HashMap::new(), + } + } + + pub fn server_config(&self) -> &ServerConfig { + &self.server_config + } + + pub fn instances(&self) -> impl Iterator { + self.instances.values() + } + + /// Check if an event comes from an instance whose name is known. + /// + /// Assuming every instance has a unique name, events from unknown instances + /// should be discarded. This helps prevent "ghost instances" that were + /// stopped but haven't yet disconnected properly from influencing your + /// bot's state. + /// + /// The user is responsible for ensuring that instances' names are unique. + pub fn is_from_known_instance(&self, event: &instance::Event) -> bool { + self.instances.contains_key(&event.config().name) + } + + pub fn is_empty(&self) -> bool { + self.instances.is_empty() + } + + /// Get an instance by its name. + pub fn get(&self, name: &str) -> Option<&Instance> { + self.instances.get(name) + } + + /// Add a new instance. + /// + /// If an instance with the same name exists already, it will be replaced by + /// the new instance. + pub fn add(&mut self, instance: Instance) { + self.instances + .insert(instance.config().name.clone(), instance); + } + + /// Remove an instance by its name. + pub fn remove(&mut self, name: &str) -> Option { + self.instances.remove(name) + } + + /// Remove all stopped instances. + /// + /// This function should be called regularly. + pub fn purge(&mut self) { + self.instances.retain(|_, i| !i.stopped()); + } +} diff --git a/src/conn.rs b/src/conn.rs index 0f48779..7255d60 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,24 +1,23 @@ //! Connection state modeling. -// TODO Catch errors differently when sending into mpsc/oneshot - use std::collections::HashMap; use std::convert::Infallible; use std::future::Future; -use std::time::Duration; -use std::{error, fmt}; +use std::time::{Duration, Instant}; +use std::{error, fmt, result}; -use futures::channel::oneshot; -use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; +use futures_util::SinkExt; +use jiff::Timestamp; +use log::debug; use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio::{select, task, time}; +use tokio::select; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::StreamExt; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::http::{header, HeaderValue}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; -use crate::api::packet::{Command, Packet, ParsedPacket}; +use crate::api::packet::{Command, ParsedPacket}; use crate::api::{ BounceEvent, Data, HelloEvent, LoginReply, NickEvent, PersonalAccountView, Ping, PingReply, SessionId, SessionView, SnapshotEvent, Time, UserId, @@ -29,66 +28,82 @@ pub type WsStream = WebSocketStream>; #[derive(Debug)] pub enum Error { + /// The connection is now closed. ConnectionClosed, - TimedOut, - IncorrectReplyType, + /// The connection was not opened in time. + ConnectionTimedOut, + /// The server didn't reply to one of our commands in time. + CommandTimedOut, + /// The server did something that violated the api specification. + ProtocolViolation(&'static str), + /// An error returned by the euphoria server. Euph(String), + + Tungstenite(tungstenite::Error), + SerdeJson(serde_json::Error), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ConnectionClosed => write!(f, "connection closed"), - Self::TimedOut => write!(f, "packet timed out"), - Self::IncorrectReplyType => write!(f, "incorrect reply type"), - Self::Euph(error_msg) => write!(f, "{error_msg}"), + Self::ConnectionTimedOut => write!(f, "connection did not open in time"), + Self::CommandTimedOut => write!(f, "server did not reply to command in time"), + Self::ProtocolViolation(msg) => write!(f, "{msg}"), + Self::Euph(msg) => write!(f, "{msg}"), + Self::Tungstenite(err) => write!(f, "{err}"), + Self::SerdeJson(err) => write!(f, "{err}"), } } } +impl From for Error { + fn from(err: tungstenite::Error) -> Self { + Self::Tungstenite(err) + } +} + +impl From for Error { + fn from(err: serde_json::Error) -> Self { + Self::SerdeJson(err) + } +} + impl error::Error for Error {} -type InternalResult = Result>; +pub type Result = result::Result; -#[derive(Debug)] -enum Event { - Message(tungstenite::Message), - SendCmd(Data, oneshot::Sender>), - SendRpl(Option, Data), - Status(oneshot::Sender), - DoPings, -} - -impl Event { - fn send_cmd>(cmd: C, rpl: oneshot::Sender>) -> Self { - Self::SendCmd(cmd.into(), rpl) - } - - fn send_rpl>(id: Option, rpl: C) -> Self { - Self::SendRpl(id, rpl.into()) - } -} - -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Joining { + pub since: Timestamp, pub hello: Option, pub snapshot: Option, pub bounce: Option, } impl Joining { - fn on_data(&mut self, data: &Data) -> InternalResult<()> { + fn new() -> Self { + Self { + since: Timestamp::now(), + hello: None, + snapshot: None, + bounce: None, + } + } + + fn on_data(&mut self, data: &Data) -> Result<()> { match data { Data::BounceEvent(p) => self.bounce = Some(p.clone()), Data::HelloEvent(p) => self.hello = Some(p.clone()), Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), + // TODO Check and maybe expand list of unexpected packet types Data::JoinEvent(_) | Data::NetworkEvent(_) | Data::NickEvent(_) | Data::EditMessageEvent(_) | Data::PartEvent(_) | Data::PmInitiateEvent(_) - | Data::SendEvent(_) => return Err("unexpected packet type".into()), + | Data::SendEvent(_) => return Err(Error::ProtocolViolation("unexpected packet type")), _ => {} } Ok(()) @@ -107,6 +122,7 @@ impl Joining { .map(|s| (s.session_id.clone(), SessionInfo::Full(s))) .collect::>(); Some(Joined { + since: Timestamp::now(), session, account: hello.account.clone(), listing, @@ -148,6 +164,7 @@ impl SessionInfo { #[derive(Debug, Clone)] pub struct Joined { + pub since: Timestamp, pub session: SessionView, pub account: Option, pub listing: HashMap, @@ -157,20 +174,24 @@ impl Joined { fn on_data(&mut self, data: &Data) { match data { Data::JoinEvent(p) => { + debug!("Updating listing after join-event"); self.listing .insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone())); } Data::SendEvent(p) => { + debug!("Updating listing after send-event"); self.listing.insert( p.0.sender.session_id.clone(), SessionInfo::Full(p.0.sender.clone()), ); } Data::PartEvent(p) => { + debug!("Updating listing after part-event"); self.listing.remove(&p.0.session_id); } Data::NetworkEvent(p) => { if p.r#type == "partition" { + debug!("Updating listing after network-event with type partition"); self.listing.retain(|_, s| match s { SessionInfo::Full(s) => { s.server_id != p.server_id && s.server_era != p.server_era @@ -190,6 +211,7 @@ impl Joined { } } Data::NickEvent(p) => { + debug!("Updating listing after nick-event"); self.listing .entry(p.session_id.clone()) .and_modify(|s| match s { @@ -199,6 +221,7 @@ impl Joined { .or_insert_with(|| SessionInfo::Partial(p.clone())); } Data::NickReply(p) => { + debug!("Updating own session after nick-reply"); assert_eq!(self.session.id, p.id); self.session.name = p.to.clone(); } @@ -211,183 +234,322 @@ impl Joined { #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] -pub enum Status { +pub enum State { Joining(Joining), Joined(Joined), } -struct State { - ws_tx: SplitSink, +impl State { + pub fn into_joining(self) -> Option { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + pub fn into_joined(self) -> Option { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } + + pub fn joining(&self) -> Option<&Joining> { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + pub fn joined(&self) -> Option<&Joined> { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } +} + +#[allow(clippy::large_enum_variant)] +enum ConnCommand { + SendCmd(Data, oneshot::Sender>), + GetState(oneshot::Sender), +} + +#[derive(Debug, Clone)] +pub struct ConnTx { + cmd_tx: mpsc::UnboundedSender, +} + +impl ConnTx { + /// The async part of sending a command. + /// + /// This is split into a separate function so that [`Self::send`] can be + /// fully synchronous (you can safely throw away the returned future) while + /// still guaranteeing that the packet was sent. + async fn finish_send(rx: oneshot::Receiver>) -> Result + where + C: Command, + C::Reply: TryFrom, + { + let pending_reply = rx + .await + // This should only happen if something goes wrong during encoding + // of the packet or while sending it through the websocket. Assuming + // the first doesn't happen, the connection is probably closed. + .map_err(|_| Error::ConnectionClosed)?; + + let data = pending_reply + .get() + .await + .map_err(|e| match e { + replies::Error::TimedOut => Error::CommandTimedOut, + replies::Error::Canceled => Error::ConnectionClosed, + })? + .content + .map_err(Error::Euph)?; + + data.try_into() + .map_err(|_| Error::ProtocolViolation("incorrect command reply type")) + } + + /// Send a command to the server. + /// + /// Returns a future containing the server's reply. This future does not + /// have to be awaited and can be safely ignored if you are not interested + /// in the reply. + /// + /// This function may return before the command was sent. To ensure that it + /// was sent before doing something else, await the returned future first. + /// + /// When called multiple times, this function guarantees that the commands + /// are sent in the order that the function is called. + pub fn send(&self, cmd: C) -> impl Future> + where + C: Command + Into, + C::Reply: TryFrom, + { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ConnCommand::SendCmd(cmd.into(), tx)); + Self::finish_send::(rx) + } + + /// Like [`Self::send`] but ignoring the server's reply. + pub fn send_only>(&self, cmd: C) { + let (tx, _) = oneshot::channel(); + let _ = self.cmd_tx.send(ConnCommand::SendCmd(cmd.into(), tx)); + } + + pub async fn state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ConnCommand::GetState(tx)) + .map_err(|_| Error::ConnectionClosed)?; + rx.await.map_err(|_| Error::ConnectionClosed) + } +} + +#[derive(Debug)] +pub struct Conn { + ws: WsStream, last_id: usize, replies: Replies, - packet_tx: mpsc::UnboundedSender, + conn_tx: ConnTx, + cmd_rx: mpsc::UnboundedReceiver, - // The server may send a pong frame with arbitrary payload unprompted at any - // time (see RFC 6455 5.5.3). Because of this, we can't just remember the - // last pong payload. - ws_ping_counter: u64, - last_ws_ping: Option>, + // The websocket server may send a pong frame with arbitrary payload + // unprompted at any time (see RFC 6455 5.5.3). Because of this, we can't + // just remember the last pong payload. + last_ping: Instant, + last_ws_ping_payload: Option>, last_ws_ping_replied_to: bool, + last_euph_ping_payload: Option