diff --git a/.vscode/settings.json b/.vscode/settings.json index 7a89179..4ab04ef 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,8 +1,8 @@ { - "files.insertFinalNewline": true, - "rust-analyzer.cargo.features": "all", - "rust-analyzer.imports.granularity.enforce": true, - "rust-analyzer.imports.granularity.group": "module", - "rust-analyzer.imports.group.enable": true, - "evenBetterToml.formatter.columnWidth": 100, + "files.insertFinalNewline": true, + "rust-analyzer.cargo.features": "all", + "rust-analyzer.imports.granularity.enforce": true, + "rust-analyzer.imports.granularity.group": "crate", + "rust-analyzer.imports.group.enable": true, + "evenBetterToml.formatter.columnWidth": 100 } diff --git a/CHANGELOG.md b/CHANGELOG.md index 524e877..6dfca90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,18 +14,6 @@ Procedure when bumping the version number: ## Unreleased -## v0.6.1 - 2025-02-23 - -### Changed - -- Updated set of emoji names - -### Fixed - -- Nick hue hashing algorithm in some edge cases - -## v0.6.0 - 2025-02-21 - ### Added - `api::Time::from_timestamp` diff --git a/Cargo.toml b/Cargo.toml index cf65579..b0807c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,48 +1,29 @@ -[package] -name = "euphoxide" -version = "0.6.1" +[workspace] +resolver = "2" +members = ["euphoxide", "euphoxide-bot"] + +[workspace.package] +version = "0.5.1" edition = "2021" -[features] -bot = ["dep:async-trait", "dep:clap", "dep:cookie"] - -[dependencies] -async-trait = { version = "0.1.86", optional = true } -caseless = "0.2.2" -cookie = { version = "0.18.1", optional = true } -futures-util = { version = "0.3.31", default-features = false, features = ["sink"] } -jiff = { version = "0.2.1", features = ["serde"] } -log = "0.4.25" -serde = { version = "1.0.218", features = ["derive"] } -serde_json = "1.0.139" -tokio = { version = "1.43.0", features = ["time", "sync", "macros", "rt"] } -tokio-stream = "0.1.17" -tokio-tungstenite = { version = "0.26.2", features = ["rustls-tls-native-roots"] } +[workspace.dependencies] +async-trait = "0.1.83" +caseless = "0.2.1" +clap = { version = "4.5.23", default-features = false, features = ["std", "derive"] } +cookie = "0.18.1" +futures-util = "0.3.31" +jiff = { version = "0.1.15", default-features = false, features = ["std"] } +log = "0.4.22" +serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0.133" +tokio = { version = "1.42.0", features = ["macros", "sync", "time"] } +tokio-tungstenite = "0.24.0" unicode-normalization = "0.1.24" +# For examples +anyhow = "1.0.94" +rustls = "0.23.19" -[dependencies.clap] -version = "4.5.30" -optional = true -default-features = false -features = ["std", "derive", "deprecated"] - -[dev-dependencies] # For example bot -rustls = "0.23.23" -tokio = { version = "1.43.0", features = ["rt-multi-thread"] } - -[[example]] -name = "testbot_instance" -required-features = ["bot"] - -[[example]] -name = "testbot_instances" -required-features = ["bot"] - -[[example]] -name = "testbot_commands" -required-features = ["bot"] - -[lints] +[workspace.lints] rust.unsafe_code = { level = "forbid", priority = 1 } # Lint groups rust.deprecated_safe = "warn" diff --git a/euphoxide-bot/Cargo.toml b/euphoxide-bot/Cargo.toml new file mode 100644 index 0000000..bd160d2 --- /dev/null +++ b/euphoxide-bot/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "euphoxide-bot" +version = { workspace = true } +edition = { workspace = true } + +[features] +clap = ["dep:clap"] + +[dependencies] +async-trait = { workspace = true } +clap = { workspace = true, optional = true } +cookie = { workspace = true } +euphoxide = { path = "../euphoxide" } +jiff = { workspace = true } +log = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +tokio-tungstenite = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } + +[lints] +workspace = true diff --git a/euphoxide-bot/examples/examplebot.rs b/euphoxide-bot/examples/examplebot.rs new file mode 100644 index 0000000..1f7b74f --- /dev/null +++ b/euphoxide-bot/examples/examplebot.rs @@ -0,0 +1,102 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use euphoxide::{ + api::{Data, Message, Nick, ParsedPacket, Send}, + client::{conn::ClientConnHandle, state::State}, +}; +use euphoxide_bot::{ + bot::Bot, + command::{ + botrulez::{FullHelp, HasCommandInfos, HasStartTime, Ping, ShortHelp, Uptime}, + Command, CommandExt, Commands, Context, Info, Propagate, + }, + instance::ServerConfig, +}; +use jiff::Timestamp; + +struct Pyramid; + +#[async_trait] +impl Command for Pyramid { + fn info(&self, _ctx: &Context) -> Info { + Info::new().with_description("build a pyramid") + } + + async fn execute( + &self, + _arg: &str, + msg: &Message, + ctx: &Context, + _bot: &BotState, + ) -> euphoxide::Result { + let mut parent = msg.id; + + for _ in 0..3 { + let first = ctx.reply(parent, "brick").await?; + ctx.reply_only(parent, "brick").await?; + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + ctx.reply_only(parent, "brick").await?; + Ok(Propagate::No) + } +} + +#[derive(Clone)] +struct BotState { + start_time: Timestamp, + commands: Arc>, +} + +impl HasStartTime for BotState { + fn start_time(&self) -> Timestamp { + self.start_time + } +} + +impl HasCommandInfos for BotState { + fn command_infos(&self, ctx: &Context) -> Vec { + self.commands.infos(ctx) + } +} + +async fn run() -> anyhow::Result<()> { + let commands = Commands::new() + .then(Ping::default()) + .then(Uptime) + .then(ShortHelp::new("/me demonstrates how to use euphoxide")) + .then(FullHelp::new()) + .then(Pyramid.global("pyramid")); + + let commands = Arc::new(commands); + + let state = BotState { + start_time: Timestamp::now(), + commands: commands.clone(), + }; + + let mut bot = Bot::new(); + + let config = ServerConfig::default() + .instance("test") + .with_username("examplebot"); + + bot.add_instance(config); + + while let Some(event) = bot.recv().await { + commands.on_bot_event(event, &state).await?; + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide-bot/examples/examplebot_instance.rs b/euphoxide-bot/examples/examplebot_instance.rs new file mode 100644 index 0000000..de0c882 --- /dev/null +++ b/euphoxide-bot/examples/examplebot_instance.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::ClientConnHandle, +}; +use euphoxide_bot::instance::{Instance, InstanceEvent, ServerConfig}; +use tokio::sync::mpsc; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let config = ServerConfig::default() + .instance("test") + .with_username("examplebot"); + + let (event_tx, mut event_rx) = mpsc::channel(10); + let _instance = Instance::new(0, config, event_tx); // Don't drop or instance stops + + while let Some(event) = event_rx.recv().await { + if let InstanceEvent::Packet { conn, packet, .. } = event { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn, data)); + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide-bot/src/bot.rs b/euphoxide-bot/src/bot.rs new file mode 100644 index 0000000..2fcece6 --- /dev/null +++ b/euphoxide-bot/src/bot.rs @@ -0,0 +1,180 @@ +use std::{collections::HashMap, time::Duration}; + +use euphoxide::{ + api::ParsedPacket, + client::{conn::ClientConnHandle, state::State}, +}; +use tokio::sync::mpsc; + +use crate::instance::{Instance, InstanceConfig, InstanceEvent}; + +#[derive(Debug)] +pub enum BotEvent { + Started { + instance: Instance, + }, + Connecting { + instance: Instance, + }, + Connected { + instance: Instance, + conn: ClientConnHandle, + state: State, + }, + Joined { + instance: Instance, + conn: ClientConnHandle, + state: State, + }, + Packet { + instance: Instance, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + instance: Instance, + }, + Stopped { + instance: Instance, + }, +} + +impl BotEvent { + fn from_instance_event(instance: Instance, event: InstanceEvent) -> Self { + match event { + InstanceEvent::Started { id: _ } => Self::Started { instance }, + InstanceEvent::Connecting { id: _ } => Self::Connecting { instance }, + InstanceEvent::Connected { id: _, conn, state } => Self::Connected { + instance, + conn, + state, + }, + InstanceEvent::Joined { id: _, conn, state } => Self::Joined { + instance, + conn, + state, + }, + InstanceEvent::Packet { + id: _, + conn, + state, + packet, + } => Self::Packet { + instance, + conn, + state, + packet, + }, + InstanceEvent::Disconnected { id: _ } => Self::Disconnected { instance }, + InstanceEvent::Stopped { id: _ } => Self::Stopped { instance }, + } + } + + pub fn instance(&self) -> &Instance { + match self { + Self::Started { instance } => instance, + Self::Connecting { instance, .. } => instance, + Self::Connected { instance, .. } => instance, + Self::Joined { instance, .. } => instance, + Self::Packet { instance, .. } => instance, + Self::Disconnected { instance } => instance, + Self::Stopped { instance } => instance, + } + } +} + +#[non_exhaustive] +pub struct BotConfig { + pub event_timeout: Duration, + pub event_channel_bufsize: usize, +} + +impl Default for BotConfig { + fn default() -> Self { + Self { + event_timeout: Duration::from_secs(1), + event_channel_bufsize: 10, + } + } +} + +pub struct Bot { + config: BotConfig, + next_id: usize, + instances: HashMap, + event_tx: mpsc::Sender, + event_rx: mpsc::Receiver, +} + +impl Bot { + pub fn new() -> Self { + Self::new_with_config(BotConfig::default()) + } + + pub fn new_with_config(config: BotConfig) -> Self { + let (event_tx, event_rx) = mpsc::channel(10); + Self { + config, + next_id: 0, + instances: HashMap::new(), + event_tx, + event_rx, + } + } + + fn purge_instances(&mut self) { + self.instances.retain(|_, v| !v.stopped()); + } + + pub fn get_instances(&self) -> Vec { + self.instances.values().cloned().collect() + } + + pub fn add_instance(&mut self, config: InstanceConfig) -> Instance { + let id = self.next_id; + self.next_id += 1; + + assert!(!self.instances.contains_key(&id)); + + let instance = Instance::new(id, config, self.event_tx.clone()); + self.instances.insert(id, instance.clone()); + + instance + } + + pub async fn recv(&mut self) -> Option { + // We hold exactly one sender. If no other senders exist, then all + // instances are dead and we'll never receive any more events unless we + // return and allow the user to add more instances again. + while self.event_rx.sender_strong_count() > 1 { + // Prevent potential memory leak + self.purge_instances(); + + let Ok(event) = + tokio::time::timeout(self.config.event_timeout, self.event_rx.recv()).await + else { + // We need to re-check the sender count occasionally. It's + // possible that there are still instances that just haven't + // sent an event in a while, so we can't just return here. + continue; + }; + + // This only returns None if no senders remain, and since we always + // own one sender, this can't happen. + let event = event.expect("event channel should never close since we own a sender"); + + if let Some(instance) = self.instances.get(&event.id()) { + return Some(BotEvent::from_instance_event(instance.clone(), event)); + } + } + + None + } +} + +impl Default for Bot { + fn default() -> Self { + Self::new() + } +} diff --git a/euphoxide-bot/src/command.rs b/euphoxide-bot/src/command.rs new file mode 100644 index 0000000..02a2e92 --- /dev/null +++ b/euphoxide-bot/src/command.rs @@ -0,0 +1,238 @@ +pub mod bang; +pub mod basic; +pub mod botrulez; +#[cfg(feature = "clap")] +pub mod clap; + +use std::future::Future; + +use async_trait::async_trait; +use bang::{General, Global, Specific}; +use basic::{Described, Prefixed}; +use euphoxide::{ + api::{self, Data, Message, MessageId, ParsedPacket, SendEvent, SendReply}, + client::{ + conn::ClientConnHandle, + state::{Joined, State}, + }, +}; + +use crate::{bot::BotEvent, instance::InstanceEvent}; + +#[non_exhaustive] +pub struct Context { + pub conn: ClientConnHandle, + pub joined: Joined, +} + +impl Context { + pub async fn send( + &self, + content: S, + ) -> euphoxide::Result>> { + self.conn + .send(api::Send { + content: content.to_string(), + parent: None, + }) + .await + } + + pub async fn send_only(&self, content: S) -> euphoxide::Result<()> { + let _ignore = self.send(content).await?; + Ok(()) + } + + pub async fn reply( + &self, + parent: MessageId, + content: S, + ) -> euphoxide::Result>> { + self.conn + .send(api::Send { + content: content.to_string(), + parent: Some(parent), + }) + .await + } + + pub async fn reply_only( + &self, + parent: MessageId, + content: S, + ) -> euphoxide::Result<()> { + let _ignore = self.reply(parent, content).await?; + Ok(()) + } +} + +#[derive(Default)] +pub struct Info { + pub trigger: Option, + pub description: Option, +} + +impl Info { + pub fn new() -> Self { + Self::default() + } + + pub fn with_trigger(mut self, trigger: impl ToString) -> Self { + self.trigger = Some(trigger.to_string()); + self + } + + pub fn with_description(mut self, description: impl ToString) -> Self { + self.description = Some(description.to_string()); + self + } + + pub fn prepend_trigger(&mut self, trigger: impl ToString) { + let cur_trigger = self.trigger.get_or_insert_default(); + if !cur_trigger.is_empty() { + cur_trigger.insert(0, ' '); + } + cur_trigger.insert_str(0, &trigger.to_string()); + } + + pub fn with_prepended_trigger(mut self, trigger: impl ToString) -> Self { + self.prepend_trigger(trigger); + self + } +} + +/// Whether a message should propagate to subsequent commands. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Propagate { + No, + Yes, +} + +#[allow(unused_variables)] +#[async_trait] +pub trait Command { + fn info(&self, ctx: &Context) -> Info { + Info::default() + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &B, + ) -> Result; +} + +pub trait CommandExt: Sized { + fn described(self) -> Described { + Described::new(self) + } + + fn hidden(self) -> Described { + Described::hidden(self) + } + + fn prefixed(self, prefix: impl ToString) -> Prefixed { + Prefixed::new(prefix, self) + } + + fn general(self, name: impl ToString) -> General { + General::new(name, self) + } + + fn global(self, name: impl ToString) -> Global { + Global::new(name, self) + } + + fn specific(self, name: impl ToString) -> Specific { + Specific::new(name, self) + } +} + +impl> CommandExt for C {} + +pub struct Commands { + commands: Vec + Sync + Send>>, +} + +impl Commands { + pub fn new() -> Self { + Self { commands: vec![] } + } + + pub fn add(&mut self, command: impl Command + Sync + Send + 'static) { + self.commands.push(Box::new(command)); + } + + pub fn then(mut self, command: impl Command + Sync + Send + 'static) -> Self { + self.add(command); + self + } + + pub fn infos(&self, ctx: &Context) -> Vec { + self.commands.iter().map(|c| c.info(ctx)).collect() + } + + pub async fn on_packet( + &self, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + bot: &B, + ) -> Result { + let Ok(Data::SendEvent(SendEvent(msg))) = &packet.content else { + return Ok(Propagate::Yes); + }; + + let State::Joined(joined) = state else { + return Ok(Propagate::Yes); + }; + + let ctx = Context { conn, joined }; + + for command in &self.commands { + let propagate = command.execute(&msg.content, msg, &ctx, bot).await?; + if propagate == Propagate::No { + return Ok(Propagate::No); + } + } + + Ok(Propagate::Yes) + } + + pub async fn on_instance_event(&self, event: InstanceEvent, bot: &B) -> Result { + if let InstanceEvent::Packet { + conn, + state, + packet, + .. + } = event + { + self.on_packet(conn, state, packet, bot).await + } else { + Ok(Propagate::Yes) + } + } + + pub async fn on_bot_event(&self, event: BotEvent, bot: &B) -> Result { + if let BotEvent::Packet { + conn, + state, + packet, + .. + } = event + { + self.on_packet(conn, state, packet, bot).await + } else { + Ok(Propagate::Yes) + } + } +} + +// Has fewer restrictions on generic types than #[derive(Default)]. +impl Default for Commands { + fn default() -> Self { + Self::new() + } +} diff --git a/src/bot/command/bang.rs b/euphoxide-bot/src/command/bang.rs similarity index 69% rename from src/bot/command/bang.rs rename to euphoxide-bot/src/command/bang.rs index a55d99d..778b211 100644 --- a/src/bot/command/bang.rs +++ b/euphoxide-bot/src/command/bang.rs @@ -1,9 +1,9 @@ +//! Euphoria-style `!foo` and `!foo @bar` command wrappers. + use async_trait::async_trait; +use euphoxide::{api::Message, nick}; -use crate::api::Message; -use crate::nick; - -use super::{Command, Context}; +use super::{Command, Context, Info, Propagate}; // TODO Don't ignore leading whitespace? // I'm not entirely happy with how commands handle whitespace, and on euphoria, @@ -38,7 +38,7 @@ impl Global { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } @@ -47,12 +47,13 @@ impl Global { #[async_trait] impl Command for Global where - B: Send, + B: Sync, C: Command + Send + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; - Some(format!("{}{} - {inner}", self.prefix, self.name)) + fn info(&self, ctx: &Context) -> Info { + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{}", self.prefix, self.name)) } async fn execute( @@ -60,16 +61,14 @@ where arg: &str, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + bot: &B, + ) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } self.inner.execute(rest, msg, ctx, bot).await @@ -91,7 +90,7 @@ impl General { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } @@ -100,12 +99,13 @@ impl General { #[async_trait] impl Command for General where - B: Send, + B: Sync, C: Command + Send + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; - Some(format!("{}{} - {inner}", self.prefix, self.name)) + fn info(&self, ctx: &Context) -> Info { + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{}", self.prefix, self.name)) } async fn execute( @@ -113,23 +113,21 @@ where arg: &str, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + bot: &B, + ) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } if parse_prefix_initiated(rest, "@").is_some() { // The command looks like a specific command. If we treated it like // a general command match, we would interpret other bots' specific // commands as general commands. - return Ok(false); + return Ok(Propagate::Yes); } self.inner.execute(rest, msg, ctx, bot).await @@ -151,7 +149,7 @@ impl Specific { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } @@ -160,13 +158,14 @@ impl Specific { #[async_trait] impl Command for Specific where - B: Send, + B: Sync, C: Command + Send + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; + fn info(&self, ctx: &Context) -> Info { let nick = nick::mention(&ctx.joined.session.name); - Some(format!("{}{} @{nick} - {inner}", self.prefix, self.name)) + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{} @{nick}", self.prefix, self.name)) } async fn execute( @@ -174,26 +173,22 @@ where arg: &str, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + bot: &B, + ) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } - // TODO Replace with let-else - let (nick, rest) = match parse_prefix_initiated(rest, "@") { - Some(parsed) => parsed, - None => return Ok(false), + let Some((nick, rest)) = parse_prefix_initiated(rest, "@") else { + return Ok(Propagate::Yes); }; if nick::normalize(nick) != nick::normalize(&ctx.joined.session.name) { - return Ok(false); + return Ok(Propagate::Yes); } self.inner.execute(rest, msg, ctx, bot).await diff --git a/euphoxide-bot/src/command/basic.rs b/euphoxide-bot/src/command/basic.rs new file mode 100644 index 0000000..6d943f4 --- /dev/null +++ b/euphoxide-bot/src/command/basic.rs @@ -0,0 +1,113 @@ +//! Basic command wrappers. + +use async_trait::async_trait; +use euphoxide::api::Message; + +use super::{Command, Context, Info, Propagate}; + +/// Rewrite or hide command info. +pub struct Described { + pub inner: C, + pub trigger: Option>, + pub description: Option>, +} + +impl Described { + pub fn new(inner: C) -> Self { + Self { + inner, + trigger: None, + description: None, + } + } + + pub fn hidden(inner: C) -> Self { + Self::new(inner) + .with_trigger_hidden() + .with_description_hidden() + } + + pub fn with_trigger(mut self, trigger: impl ToString) -> Self { + self.trigger = Some(Some(trigger.to_string())); + self + } + + pub fn with_trigger_hidden(mut self) -> Self { + self.trigger = Some(None); + self + } + + pub fn with_description(mut self, description: impl ToString) -> Self { + self.description = Some(Some(description.to_string())); + self + } + + pub fn with_description_hidden(mut self) -> Self { + self.description = Some(None); + self + } +} + +#[async_trait] +impl Command for Described +where + B: Sync, + C: Command + Send + Sync, +{ + fn info(&self, ctx: &Context) -> Info { + let info = self.inner.info(ctx); + Info { + trigger: self.trigger.clone().unwrap_or(info.trigger), + description: self.description.clone().unwrap_or(info.description), + } + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &B, + ) -> Result { + self.inner.execute(arg, msg, ctx, bot).await + } +} + +pub struct Prefixed { + prefix: String, + inner: C, +} + +impl Prefixed { + pub fn new(prefix: S, inner: C) -> Self { + Self { + prefix: prefix.to_string(), + inner, + } + } +} + +#[async_trait] +impl Command for Prefixed +where + B: Sync, + C: Command + Send + Sync, +{ + fn info(&self, ctx: &Context) -> Info { + self.inner.info(ctx).with_prepended_trigger(&self.prefix) + } + + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &B, + ) -> Result { + if let Some(rest) = arg.trim_start().strip_prefix(&self.prefix) { + self.inner.execute(rest, msg, ctx, bot).await + } else { + Ok(Propagate::Yes) + } + } +} diff --git a/euphoxide-bot/src/command/botrulez.rs b/euphoxide-bot/src/command/botrulez.rs new file mode 100644 index 0000000..94e7875 --- /dev/null +++ b/euphoxide-bot/src/command/botrulez.rs @@ -0,0 +1,8 @@ +//! The main [botrulez](https://github.com/jedevc/botrulez) commands. + +mod full_help; +mod ping; +mod short_help; +mod uptime; + +pub use self::{full_help::*, ping::*, short_help::*, uptime::*}; diff --git a/euphoxide-bot/src/command/botrulez/full_help.rs b/euphoxide-bot/src/command/botrulez/full_help.rs new file mode 100644 index 0000000..47e944c --- /dev/null +++ b/euphoxide-bot/src/command/botrulez/full_help.rs @@ -0,0 +1,111 @@ +use async_trait::async_trait; +#[cfg(feature = "clap")] +use clap::Parser; +use euphoxide::api::Message; + +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Info, Propagate}; + +pub trait HasCommandInfos { + fn command_infos(&self, ctx: &Context) -> Vec; +} + +#[derive(Default)] +pub struct FullHelp { + pub before: String, + pub after: String, +} + +impl FullHelp { + pub fn new() -> Self { + Self::default() + } + + pub fn with_before(mut self, before: impl ToString) -> Self { + self.before = before.to_string(); + self + } + + pub fn with_after(mut self, after: impl ToString) -> Self { + self.after = after.to_string(); + self + } + + fn formulate_reply(&self, ctx: &Context, bot: &B) -> String { + let mut result = String::new(); + + if !self.before.is_empty() { + result.push_str(&self.before); + result.push('\n'); + } + + for info in bot.command_infos(ctx) { + if let Some(trigger) = &info.trigger { + result.push_str(trigger); + if let Some(description) = &info.description { + result.push_str(" - "); + result.push_str(description); + } + result.push('\n'); + } + } + + if !self.after.is_empty() { + result.push_str(&self.after); + result.push('\n'); + } + + result + } +} + +#[async_trait] +impl Command for FullHelp +where + B: HasCommandInfos + Sync, + E: From, +{ + async fn execute( + &self, + arg: &str, + msg: &Message, + ctx: &Context, + bot: &B, + ) -> Result { + if arg.trim().is_empty() { + let reply = self.formulate_reply(ctx, bot); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) + } else { + Ok(Propagate::Yes) + } + } +} + +/// Show full bot help. +#[cfg(feature = "clap")] +#[derive(Parser)] +pub struct FullHelpArgs {} + +#[cfg(feature = "clap")] +#[async_trait] +impl ClapCommand for FullHelp +where + B: HasCommandInfos + Sync, + E: From, +{ + type Args = FullHelpArgs; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + bot: &B, + ) -> Result { + let reply = self.formulate_reply(ctx, bot); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) + } +} diff --git a/src/bot/botrulez/ping.rs b/euphoxide-bot/src/command/botrulez/ping.rs similarity index 53% rename from src/bot/botrulez/ping.rs rename to euphoxide-bot/src/command/botrulez/ping.rs index c7cea39..7d4da58 100644 --- a/src/bot/botrulez/ping.rs +++ b/euphoxide-bot/src/command/botrulez/ping.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; +#[cfg(feature = "clap")] use clap::Parser; +use euphoxide::api::Message; -use crate::api::Message; -use crate::bot::command::{ClapCommand, Command, Context}; -use crate::conn; +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; pub struct Ping(pub String); @@ -22,43 +24,45 @@ impl Default for Ping { #[async_trait] impl Command for Ping where - E: From, + E: From, { async fn execute( &self, arg: &str, msg: &Message, ctx: &Context, - _bot: &mut B, - ) -> Result { + _bot: &B, + ) -> Result { if arg.trim().is_empty() { - ctx.reply(msg.id, &self.0).await?; - Ok(true) + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) } else { - Ok(false) + Ok(Propagate::Yes) } } } /// Trigger a short reply. +#[cfg(feature = "clap")] #[derive(Parser)] -pub struct Args {} +pub struct PingArgs {} +#[cfg(feature = "clap")] #[async_trait] impl ClapCommand for Ping where - E: From, + E: From, { - type Args = Args; + type Args = PingArgs; async fn execute( &self, _args: Self::Args, msg: &Message, ctx: &Context, - _bot: &mut B, - ) -> Result { - ctx.reply(msg.id, &self.0).await?; - Ok(true) + _bot: &B, + ) -> Result { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) } } diff --git a/src/bot/botrulez/short_help.rs b/euphoxide-bot/src/command/botrulez/short_help.rs similarity index 51% rename from src/bot/botrulez/short_help.rs rename to euphoxide-bot/src/command/botrulez/short_help.rs index 1a359be..249da23 100644 --- a/src/bot/botrulez/short_help.rs +++ b/euphoxide-bot/src/command/botrulez/short_help.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; +#[cfg(feature = "clap")] use clap::Parser; +use euphoxide::api::Message; -use crate::api::Message; -use crate::bot::command::{ClapCommand, Command, Context}; -use crate::conn; +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; pub struct ShortHelp(pub String); @@ -16,43 +18,45 @@ impl ShortHelp { #[async_trait] impl Command for ShortHelp where - E: From, + E: From, { async fn execute( &self, arg: &str, msg: &Message, ctx: &Context, - _bot: &mut B, - ) -> Result { + _bot: &B, + ) -> Result { if arg.trim().is_empty() { - ctx.reply(msg.id, &self.0).await?; - Ok(true) + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) } else { - Ok(false) + Ok(Propagate::Yes) } } } /// Show short bot help. +#[cfg(feature = "clap")] #[derive(Parser)] -pub struct Args {} +pub struct ShortHelpArgs {} +#[cfg(feature = "clap")] #[async_trait] impl ClapCommand for ShortHelp where - E: From, + E: From, { - type Args = Args; + type Args = ShortHelpArgs; async fn execute( &self, _args: Self::Args, msg: &Message, ctx: &Context, - _bot: &mut B, - ) -> Result { - ctx.reply(msg.id, &self.0).await?; - Ok(true) + _bot: &B, + ) -> Result { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) } } diff --git a/src/bot/botrulez/uptime.rs b/euphoxide-bot/src/command/botrulez/uptime.rs similarity index 79% rename from src/bot/botrulez/uptime.rs rename to euphoxide-bot/src/command/botrulez/uptime.rs index d8b1d0d..564d89f 100644 --- a/src/bot/botrulez/uptime.rs +++ b/euphoxide-bot/src/command/botrulez/uptime.rs @@ -1,10 +1,12 @@ use async_trait::async_trait; +#[cfg(feature = "clap")] use clap::Parser; +use euphoxide::api::Message; use jiff::{Span, Timestamp, Unit}; -use crate::api::Message; -use crate::bot::command::{ClapCommand, Command, Context}; -use crate::conn; +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; pub fn format_time(t: Timestamp) -> String { t.strftime("%Y-%m-%d %H:%M:%S UTC").to_string() @@ -83,51 +85,53 @@ impl Uptime { #[async_trait] impl Command for Uptime where - B: HasStartTime + Send, - E: From, + B: HasStartTime + Sync, + E: From, { async fn execute( &self, arg: &str, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { + bot: &B, + ) -> Result { if arg.trim().is_empty() { let reply = self.formulate_reply(ctx, bot, false); - ctx.reply(msg.id, reply).await?; - Ok(true) + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) } else { - Ok(false) + Ok(Propagate::Yes) } } } /// Show how long the bot has been online. +#[cfg(feature = "clap")] #[derive(Parser)] -pub struct Args { +pub struct UptimeArgs { /// Show how long the bot has been connected without interruption. #[arg(long, short)] pub connected: bool, } +#[cfg(feature = "clap")] #[async_trait] impl ClapCommand for Uptime where - B: HasStartTime + Send, - E: From, + B: HasStartTime + Sync, + E: From, { - type Args = Args; + type Args = UptimeArgs; async fn execute( &self, args: Self::Args, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { + bot: &B, + ) -> Result { let reply = self.formulate_reply(ctx, bot, args.connected); - ctx.reply(msg.id, reply).await?; - Ok(true) + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) } } diff --git a/src/bot/command/clap.rs b/euphoxide-bot/src/command/clap.rs similarity index 88% rename from src/bot/command/clap.rs rename to euphoxide-bot/src/command/clap.rs index a22b49a..b20edb6 100644 --- a/src/bot/command/clap.rs +++ b/euphoxide-bot/src/command/clap.rs @@ -1,10 +1,10 @@ +//! [`clap`]-based commands. + use async_trait::async_trait; use clap::{CommandFactory, Parser}; +use euphoxide::api::Message; -use crate::api::Message; -use crate::conn; - -use super::{Command, Context}; +use super::{Command, Context, Info, Propagate}; #[async_trait] pub trait ClapCommand { @@ -15,8 +15,8 @@ pub trait ClapCommand { args: Self::Args, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result; + bot: &B, + ) -> Result; } /// Parse bash-like quoted arguments separated by whitespace. @@ -101,13 +101,16 @@ pub struct Clap(pub C); #[async_trait] impl Command for Clap where - B: Send, - E: From, + B: Sync, + E: From, C: ClapCommand + Send + Sync, C::Args: Parser + Send, { - fn description(&self, _ctx: &Context) -> Option { - C::Args::command().get_about().map(|s| format!("{s}")) + fn info(&self, _ctx: &Context) -> Info { + Info { + description: C::Args::command().get_about().map(|s| s.to_string()), + ..Info::default() + } } async fn execute( @@ -115,13 +118,13 @@ where arg: &str, msg: &Message, ctx: &Context, - bot: &mut B, - ) -> Result { + bot: &B, + ) -> Result { let mut args = match parse_quoted_args(arg) { Ok(args) => args, Err(err) => { - ctx.reply(msg.id, err).await?; - return Ok(true); + ctx.reply_only(msg.id, err).await?; + return Ok(Propagate::No); } }; @@ -132,8 +135,8 @@ where let args = match C::Args::try_parse_from(args) { Ok(args) => args, Err(err) => { - ctx.reply(msg.id, format!("{}", err.render())).await?; - return Ok(true); + ctx.reply_only(msg.id, format!("{}", err.render())).await?; + return Ok(Propagate::No); } }; diff --git a/euphoxide-bot/src/instance.rs b/euphoxide-bot/src/instance.rs new file mode 100644 index 0000000..f43d77c --- /dev/null +++ b/euphoxide-bot/src/instance.rs @@ -0,0 +1,451 @@ +use std::{ + fmt, result, + str::FromStr, + sync::{Arc, Mutex}, + time::Duration, +}; + +use cookie::{Cookie, CookieJar}; +use euphoxide::{ + api::{Auth, AuthOption, BounceEvent, Data, Nick, ParsedPacket}, + client::{ + conn::{ClientConn, ClientConnConfig, ClientConnHandle}, + state::State, + }, +}; +use log::warn; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + self, + http::{HeaderValue, StatusCode}, +}; + +enum Error { + Stopped, + NoReferences, + AuthRequired, + InvalidPassword, + OutOfJoinAttempts, + Euphoxide(euphoxide::Error), +} + +impl Error { + fn is_fatal(&self) -> bool { + match self { + Self::Stopped => true, + Self::NoReferences => true, + Self::AuthRequired => true, + Self::InvalidPassword => true, + Self::OutOfJoinAttempts => true, + Self::Euphoxide(euphoxide::Error::Tungstenite(tungstenite::Error::Http(response))) => { + response.status() == StatusCode::NOT_FOUND + } + _ => false, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Stopped => write!(f, "the instance was stopped manually"), + Self::NoReferences => write!(f, "all references to the instance were dropped"), + Self::AuthRequired => write!(f, "authentication required but no credentials found"), + Self::InvalidPassword => write!(f, "authentication required but password is invalid"), + Self::OutOfJoinAttempts => write!(f, "failed to join within attempt limit"), + Self::Euphoxide(error) => write!(f, "{error}"), + } + } +} + +impl From for Error { + fn from(value: euphoxide::Error) -> Self { + Self::Euphoxide(value) + } +} + +type Result = result::Result; + +enum Command { + GetConn(oneshot::Sender), + Stop, +} + +#[derive(Debug)] +pub enum InstanceEvent { + Started { + id: usize, + }, + Connecting { + id: usize, + }, + Connected { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Joined { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Packet { + id: usize, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + id: usize, + }, + Stopped { + id: usize, + }, +} + +impl InstanceEvent { + pub fn id(&self) -> usize { + match self { + Self::Started { id } => *id, + Self::Connecting { id } => *id, + Self::Connected { id, .. } => *id, + Self::Joined { id, .. } => *id, + Self::Packet { id, .. } => *id, + Self::Disconnected { id } => *id, + Self::Stopped { id } => *id, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServerConfig { + pub client: ClientConnConfig, + pub cookies: Arc>, + pub join_attempts: usize, + pub reconnect_delay: Duration, + pub cmd_channel_bufsize: usize, +} + +impl ServerConfig { + pub fn instance(self, room: impl ToString) -> InstanceConfig { + InstanceConfig { + server: self, + room: room.to_string(), + human: false, + username: None, + force_username: false, + password: None, + } + } +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + client: ClientConnConfig::default(), + cookies: Arc::new(Mutex::new(CookieJar::new())), + join_attempts: 5, + reconnect_delay: Duration::from_secs(30), + cmd_channel_bufsize: 1, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct InstanceConfig { + pub server: ServerConfig, + pub room: String, + pub human: bool, + pub username: Option, + pub force_username: bool, + pub password: Option, +} + +impl InstanceConfig { + pub fn with_username(mut self, username: impl ToString) -> Self { + self.username = Some(username.to_string()); + self + } + + pub fn with_force_username(mut self, enabled: bool) -> Self { + self.force_username = enabled; + self + } + + pub fn with_password(mut self, password: impl ToString) -> Self { + self.password = Some(password.to_string()); + self + } +} + +struct InstanceTask { + id: usize, + config: InstanceConfig, + + cmd_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + + attempts: usize, + never_joined: bool, +} + +impl InstanceTask { + fn get_cookies(&self) -> Option { + self.config + .server + .cookies + .lock() + .unwrap() + .iter() + .map(|c| c.stripped().to_string()) + .collect::>() + .join("; ") + .try_into() + .ok() + } + + fn set_cookies(&mut self, cookies: &[HeaderValue]) { + let mut guard = self.config.server.cookies.lock().unwrap(); + for cookie in cookies { + if let Ok(cookie) = cookie.to_str() { + if let Ok(cookie) = Cookie::from_str(cookie) { + guard.add(cookie); + } + } + } + } + + async fn connect(&mut self) -> Result { + let (conn, cookies) = ClientConn::connect_with_config( + &self.config.room, + self.get_cookies(), + &self.config.server.client, + ) + .await?; + + self.set_cookies(&cookies); + + Ok(conn) + } + + async fn on_joined(&mut self, conn: &ClientConn) { + self.never_joined = false; + + let _ = self + .event_tx + .send(InstanceEvent::Joined { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + } + + async fn on_packet(&mut self, conn: &mut ClientConn, packet: ParsedPacket) -> Result<()> { + let _ = self + .event_tx + .send(InstanceEvent::Packet { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + packet: packet.clone(), + }) + .await; + + match packet.into_data()? { + // Attempting to authenticate + Data::BounceEvent(BounceEvent { + auth_options: Some(auth_options), + .. + }) if auth_options.contains(&AuthOption::Passcode) => { + if let Some(password) = &self.config.password { + conn.send(Auth { + r#type: AuthOption::Passcode, + passcode: Some(password.clone()), + }) + .await?; + } else { + return Err(Error::AuthRequired); + } + } + + // Auth attempt failed :( + Data::AuthReply(ev) if !ev.success => return Err(Error::InvalidPassword), + + // Just joined + Data::SnapshotEvent(ev) => { + if let Some(username) = &self.config.username { + if ev.nick.is_none() || self.config.force_username { + conn.send(Nick { + name: username.clone(), + }) + .await?; + } + } + + // Maybe we should only count this as joining if we successfully + // updated the nick instead of just sending a Nick command? And + // maybe we should ensure that we're in the State::Joined state? + // Both of these would probably complicate the code a lot. On + // the other hand, InstanceEvent::Joined::state would contain + // the actual nick after joining, which feels like the right + // thing to do™. Probably not worth the increase in code + // complexity though. + + self.on_joined(conn).await; + } + + _ => {} + } + + Ok(()) + } + + async fn on_cmd(&mut self, conn: &ClientConn, cmd: Command) -> Result<()> { + match cmd { + Command::GetConn(sender) => { + let _ = sender.send(conn.handle()); + Ok(()) + } + Command::Stop => Err(Error::Stopped), + } + } + + async fn run_once(&mut self) -> Result<()> { + // If we try to connect too many times without managing to join at least + // once, the room is probably not accessible for one reason or another + // and the instance should stop. + self.attempts += 1; + if self.never_joined && self.attempts > self.config.server.join_attempts { + return Err(Error::OutOfJoinAttempts); + } + + let _ = self + .event_tx + .send(InstanceEvent::Connecting { id: self.id }) + .await; + + let mut conn = match self.connect().await { + Ok(conn) => conn, + Err(err) => { + // When we fail to connect, we want to wait a bit before + // reconnecting in order not to spam the server. However, when + // we are connected successfully and then disconnect for + // whatever reason, we want to try to reconnect immediately. We + // might, for example, be disconnected from the server because + // we just logged in. + tokio::time::sleep(self.config.server.reconnect_delay).await; + Err(err)? + } + }; + + let _ = self + .event_tx + .send(InstanceEvent::Connected { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + + let result = loop { + let received = select! { + r = conn.recv() => Ok(r?), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + // We received a packet + Ok(None) => break Ok(()), // Connection closed + Ok(Some(packet)) => self.on_packet(&mut conn, packet).await?, + // We received a command + Err(None) => break Err(Error::NoReferences), + Err(Some(cmd)) => self.on_cmd(&conn, cmd).await?, + }; + }; + + let _ = self + .event_tx + .send(InstanceEvent::Disconnected { id: self.id }) + .await; + + result + } + + async fn run(mut self) { + let _ = self + .event_tx + .send(InstanceEvent::Started { id: self.id }) + .await; + + loop { + if let Err(err) = self.run_once().await { + warn!("instance {:?}: {err}", self.id); + if err.is_fatal() { + break; + } + } + } + + let _ = self + .event_tx + .send(InstanceEvent::Stopped { id: self.id }) + .await; + } +} + +#[derive(Clone)] +pub struct Instance { + id: usize, + cmd_tx: mpsc::Sender, +} + +impl fmt::Debug for Instance { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Instance") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl Instance { + pub fn new(id: usize, config: InstanceConfig, event_tx: mpsc::Sender) -> Self { + let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); + + let task = InstanceTask { + id, + config, + attempts: 0, + never_joined: false, + cmd_rx, + event_tx, + }; + + tokio::task::spawn(task.run()); + + Self { id, cmd_tx } + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn stopped(&self) -> bool { + self.cmd_tx.is_closed() + } + + pub async fn stop(&self) { + let _ = self.cmd_tx.send(Command::Stop).await; + } + + pub async fn handle(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetConn(tx)).await; + rx.await.ok() + } +} diff --git a/euphoxide-bot/src/lib.rs b/euphoxide-bot/src/lib.rs new file mode 100644 index 0000000..3a6acc7 --- /dev/null +++ b/euphoxide-bot/src/lib.rs @@ -0,0 +1,3 @@ +pub mod bot; +pub mod command; +pub mod instance; diff --git a/euphoxide/Cargo.toml b/euphoxide/Cargo.toml new file mode 100644 index 0000000..a22083e --- /dev/null +++ b/euphoxide/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "euphoxide" +edition = { workspace = true } +version = { workspace = true } + +[dependencies] +caseless = { workspace = true } +futures-util = { workspace = true } +jiff = { workspace = true } +log = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tokio-tungstenite = { workspace = true } +unicode-normalization = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } + +[lints] +workspace = true diff --git a/euphoxide/examples/examplebot.rs b/euphoxide/examples/examplebot.rs new file mode 100644 index 0000000..19fe95e --- /dev/null +++ b/euphoxide/examples/examplebot.rs @@ -0,0 +1,90 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::{ClientConn, ClientConnHandle}, +}; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let (mut conn, _) = ClientConn::connect("test", None).await?; + + while let Some(packet) = conn.recv().await? { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn.handle(), data)); + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide/src/api.rs b/euphoxide/src/api.rs new file mode 100644 index 0000000..23617f3 --- /dev/null +++ b/euphoxide/src/api.rs @@ -0,0 +1,12 @@ +//! Models the [euphoria.leet.nu API][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api + +pub mod account_cmds; +pub mod events; +pub mod packets; +pub mod room_cmds; +pub mod session_cmds; +pub mod types; + +pub use self::{account_cmds::*, events::*, packets::*, room_cmds::*, session_cmds::*, types::*}; diff --git a/src/api/account_cmds.rs b/euphoxide/src/api/account_cmds.rs similarity index 91% rename from src/api/account_cmds.rs rename to euphoxide/src/api/account_cmds.rs index 01ef7f0..b9ed570 100644 --- a/src/api/account_cmds.rs +++ b/euphoxide/src/api/account_cmds.rs @@ -1,8 +1,10 @@ -//! Account commands. +//! Models [account commands][0] and their replies. //! //! These commands enable a client to register, associate, and dissociate with //! an account. An account allows an identity to be shared across browsers and //! devices, and is a prerequisite for room management +//! +//! [0]: https://euphoria.leet.nu/heim/api#account-commands use serde::{Deserialize, Serialize}; @@ -11,6 +13,8 @@ use super::AccountId; /// Change the primary email address associated with the signed in account. /// /// The email address may need to be verified before the change is fully applied. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangeEmail { /// The new primary email address for the account. @@ -32,6 +36,8 @@ pub struct ChangeEmailReply { } /// Change the name associated with the signed in account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangeName { /// The name to associate with the account. @@ -46,6 +52,8 @@ pub struct ChangeNameReply { } /// Change the password of the signed in account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangePassword { /// The current (and soon-to-be former) password. @@ -65,6 +73,8 @@ pub struct ChangePasswordReply {} /// If the login succeeds, the client should expect to receive a /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged in session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Login { /// The namespace of a personal identifier. @@ -98,6 +108,8 @@ pub struct LoginReply { /// If the logout is successful, the client should expect to receive a /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged out session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Logout {} @@ -113,6 +125,8 @@ pub struct LogoutReply {} /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged in session using the new /// account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegisterAccount { /// The namespace of a personal identifier. @@ -145,6 +159,8 @@ pub struct RegisterAccountReply { /// /// An error will be returned if the account has no unverified email addresses /// associated with it. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResendVerificationEmail {} @@ -156,6 +172,8 @@ pub struct ResendVerificationEmailReply {} /// /// An email will be sent to the owner of the given personal identifier, with /// instructions and a confirmation code for resetting the password. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResetPassword { pub namespace: String, diff --git a/src/api/events.rs b/euphoxide/src/api/events.rs similarity index 87% rename from src/api/events.rs rename to euphoxide/src/api/events.rs index 8abe04d..b46dcb7 100644 --- a/src/api/events.rs +++ b/euphoxide/src/api/events.rs @@ -1,4 +1,6 @@ -//! Asynchronous events. +//! Models [asynchronous events][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api#asynchronous-events use serde::{Deserialize, Serialize}; @@ -8,6 +10,8 @@ use super::{ }; /// Indicates that access to a room is denied. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BounceEvent { /// The reason why access was denied. @@ -25,16 +29,37 @@ pub struct BounceEvent { /// /// If the disconnect reason is `authentication changed`, the client should /// immediately reconnect. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DisconnectEvent { /// The reason for disconnection. pub reason: String, } +/// Indicates that a message in the room has been modified or deleted. +/// +/// If the client offers a user interface and the indicated message is currently +/// displayed, it should update its display accordingly. +/// +/// The event packet includes a snapshot of the message post-edit. +/// +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EditMessageEvent { + /// The id of the edit. + pub edit_id: Snowflake, + /// The snapshot of the message post-edit. + #[serde(flatten)] + pub message: Message, +} + /// Sent by the server to the client when a session is started. /// /// It includes information about the client's authentication and associated /// identity. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HelloEvent { /// The id of the agent or account logged into this session. @@ -55,11 +80,15 @@ pub struct HelloEvent { } /// Indicates a session just joined the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JoinEvent(pub SessionView); /// Sent to all sessions of an agent when that agent is logged in (except for /// the session that issued the login command). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LoginEvent { pub account_id: AccountId, @@ -67,6 +96,8 @@ pub struct LoginEvent { /// Sent to all sessions of an agent when that agent is logged out (except for /// the session that issued the logout command). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogoutEvent {} @@ -75,6 +106,8 @@ pub struct LogoutEvent {} /// /// If the network event type is `partition`, then this should be treated as a /// [`PartEvent`] for all sessions connected to the same server id/era combo. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NetworkEvent { /// The type of network event; for now, always `partition`. @@ -86,6 +119,8 @@ pub struct NetworkEvent { } /// Announces a nick change by another session in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NickEvent { /// The id of the session this name applies to. @@ -98,22 +133,9 @@ pub struct NickEvent { pub to: String, } -/// Indicates that a message in the room has been modified or deleted. -/// -/// If the client offers a user interface and the indicated message is currently -/// displayed, it should update its display accordingly. -/// -/// The event packet includes a snapshot of the message post-edit. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EditMessageEvent { - /// The id of the edit. - pub edit_id: Snowflake, - /// The snapshot of the message post-edit. - #[serde(flatten)] - pub message: Message, -} - /// Indicates a session just disconnected from the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartEvent(pub SessionView); @@ -121,6 +143,8 @@ pub struct PartEvent(pub SessionView); /// /// The client should send back a ping-reply with the same value for the time /// field as soon as possible (or risk disconnection). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PingEvent { /// A unix timestamp according to the server's clock. @@ -131,6 +155,8 @@ pub struct PingEvent { } /// Informs the client that another user wants to chat with them privately. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PmInitiateEvent { /// The id of the user inviting the client to chat privately. @@ -144,12 +170,16 @@ pub struct PmInitiateEvent { } /// Indicates a message received by the room from another session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SendEvent(pub Message); /// Indicates that a session has successfully joined a room. /// /// It also offers a snapshot of the room’s state and recent history. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SnapshotEvent { /// The id of the agent or account logged into this session. @@ -158,7 +188,7 @@ pub struct SnapshotEvent { pub session_id: SessionId, /// The server’s version identifier. pub version: String, - /// The list of all other sessions joined to the room (excluding this + /// The list of all other sessions joined to the room (excluding our /// session). pub listing: Vec, /// The most recent messages posted to the room (currently up to 100). diff --git a/euphoxide/src/api/packets.rs b/euphoxide/src/api/packets.rs new file mode 100644 index 0000000..6326a4c --- /dev/null +++ b/euphoxide/src/api/packets.rs @@ -0,0 +1,319 @@ +//! Models the [packets][0] sent between the server and client. +//! +//! [0]: https://euphoria.leet.nu/heim/api#packets + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::Error; + +use super::PacketType; + +/// A "raw" packet. +/// +/// This packet closely matches the [packet representation defined in the +/// API][0]. It can contain arbitrary data in the form of a JSON [`Value`]. It +/// can also contain both data and an error at the same time. +/// +/// In order to interpret this packet, you probably want to convert it to a +/// [`ParsedPacket`] using [`ParsedPacket::from_packet`]. +/// +/// [0]: https://euphoria.leet.nu/heim/api#packets +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Packet { + /// Client-generated id for associating replies with commands. + pub id: Option, + /// The type of the command, reply, or event. + pub r#type: PacketType, + /// The payload of the command, reply, or event. + pub data: Option, + /// This field appears in replies if a command fails. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + /// This field appears in replies to warn the client that it may be + /// flooding. + /// + /// The client should slow down its command rate. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub throttled: bool, + /// If throttled is true, this field describes why. + #[serde(skip_serializing_if = "Option::is_none")] + pub throttled_reason: Option, +} + +/// Models the relationship between command and reply types. +/// +/// This trait is useful for type-safe command-reply APIs. +pub trait Command { + /// The type of reply one can expect from the server when sending this + /// command. + type Reply; +} + +macro_rules! packets { + ( $( $mod:ident::$name:ident, )*) => { + /// A big enum containing most types of packet data. + #[derive(Debug, Clone)] + #[non_exhaustive] + pub enum Data { + $( $name(super::$mod::$name), )* + /// A valid type of packet data that this library does not model as + /// a struct. + Unimplemented(PacketType, Value), + } + + impl Data { + /// Interpret a JSON [`Value`] as packet data of a specific [`PacketType`]. + /// + /// This method may fail if the data is invalid. + pub fn from_value(ptype: PacketType, value: Value) -> serde_json::Result { + Ok(match ptype { + $( PacketType::$name => Self::$name(serde_json::from_value(value)?), )* + _ => Self::Unimplemented(ptype, value), + }) + } + + /// Convert the packet data into a JSON [`Value`]. + /// + /// This method may fail if the data fails to serialize. + pub fn into_value(self) -> serde_json::Result { + Ok(match self { + $( Self::$name(p) => serde_json::to_value(p)?, )* + Self::Unimplemented(_, value) => value, + }) + } + + /// The [`PacketType`] of this packet data. + pub fn packet_type(&self) -> PacketType { + match self { + $( Self::$name(_) => PacketType::$name, )* + Self::Unimplemented(ptype, _) => *ptype, + } + } + } + + $( + impl From for Data { + fn from(p: super::$mod::$name) -> Self { + Self::$name(p) + } + } + + impl TryFrom for super::$mod::$name{ + type Error = (); + + fn try_from(value: Data) -> Result { + match value { + Data::$name(p) => Ok(p), + _ => Err(()) + } + } + } + )* + }; +} + +macro_rules! commands { + ( $( $cmd:ident => $rpl:ident, )* ) => { + $( + impl Command for super::$cmd { + type Reply = super::$rpl; + } + )* + }; +} + +packets! { + // Events + events::BounceEvent, + events::DisconnectEvent, + events::EditMessageEvent, + events::HelloEvent, + events::JoinEvent, + events::LoginEvent, + events::LogoutEvent, + events::NetworkEvent, + events::NickEvent, + events::PartEvent, + events::PingEvent, + events::PmInitiateEvent, + events::SendEvent, + events::SnapshotEvent, + // Session commands + session_cmds::Auth, + session_cmds::AuthReply, + session_cmds::Ping, + session_cmds::PingReply, + // Chat room commands + room_cmds::GetMessage, + room_cmds::GetMessageReply, + room_cmds::Log, + room_cmds::LogReply, + room_cmds::Nick, + room_cmds::NickReply, + room_cmds::PmInitiate, + room_cmds::PmInitiateReply, + room_cmds::Send, + room_cmds::SendReply, + room_cmds::Who, + room_cmds::WhoReply, + // Account commands + account_cmds::ChangeEmail, + account_cmds::ChangeEmailReply, + account_cmds::ChangeName, + account_cmds::ChangeNameReply, + account_cmds::ChangePassword, + account_cmds::ChangePasswordReply, + account_cmds::Login, + account_cmds::LoginReply, + account_cmds::Logout, + account_cmds::LogoutReply, + account_cmds::RegisterAccount, + account_cmds::RegisterAccountReply, + account_cmds::ResendVerificationEmail, + account_cmds::ResendVerificationEmailReply, + account_cmds::ResetPassword, + account_cmds::ResetPasswordReply, +} + +commands! { + // Session commands + Auth => AuthReply, + Ping => PingReply, + // Chat room commands + GetMessage => GetMessageReply, + Log => LogReply, + Nick => NickReply, + PmInitiate => PmInitiateReply, + Send => SendReply, + Who => WhoReply, + // Account commands + ChangeEmail => ChangeEmailReply, + ChangeName => ChangeNameReply, + ChangePassword => ChangePasswordReply, + Login => LoginReply, + Logout => LogoutReply, + RegisterAccount => RegisterAccountReply, + ResendVerificationEmail => ResendVerificationEmailReply, + ResetPassword => ResetPasswordReply, +} + +/// A fully parsed and interpreted packet. +/// +/// Compared to [`Packet`], this packet's representation more closely matches +/// the actual use of packets. +#[derive(Debug, Clone)] +pub struct ParsedPacket { + /// Client-generated id for associating replies with commands. + pub id: Option, + /// The type of the command, reply, or event. + pub r#type: PacketType, + /// The payload of the command, reply, or event, or an error message if the + /// command failed. + pub content: Result, + /// A warning to the client that it may be flooding. + /// + /// The client should slow down its command rate. + pub throttled: Option, +} + +impl ParsedPacket { + /// Convert a [`Data`]-compatible value into a [`ParsedPacket`]. + pub fn from_data(id: Option, data: impl Into) -> Self { + let data = data.into(); + Self { + id, + r#type: data.packet_type(), + content: Ok(data), + throttled: None, + } + } + + pub fn into_data(self) -> crate::Result { + self.content.map_err(Error::Euph) + } + + /// Convert a [`Packet`] into a [`ParsedPacket`]. + /// + /// This method may fail if the packet data is invalid. + pub fn from_packet(packet: Packet) -> serde_json::Result { + let id = packet.id; + let r#type = packet.r#type; + + let content = if let Some(error) = packet.error { + Err(error) + } else { + let data = packet.data.unwrap_or_default(); + Ok(Data::from_value(r#type, data)?) + }; + + let throttled = if packet.throttled { + let reason = packet + .throttled_reason + .unwrap_or_else(|| "no reason given".to_string()); + Some(reason) + } else { + None + }; + + Ok(Self { + id, + r#type, + content, + throttled, + }) + } + + /// Convert a [`ParsedPacket`] into a [`Packet`]. + /// + /// This method may fail if the packet data fails to serialize. + pub fn into_packet(self) -> serde_json::Result { + let id = self.id; + let r#type = self.r#type; + let throttled = self.throttled.is_some(); + let throttled_reason = self.throttled; + + Ok(match self.content { + Ok(data) => Packet { + id, + r#type, + data: Some(data.into_value()?), + error: None, + throttled, + throttled_reason, + }, + Err(error) => Packet { + id, + r#type, + data: None, + error: Some(error), + throttled, + throttled_reason, + }, + }) + } +} + +impl TryFrom for ParsedPacket { + type Error = serde_json::Error; + + fn try_from(value: Packet) -> Result { + Self::from_packet(value) + } +} + +impl TryFrom for Packet { + type Error = serde_json::Error; + + fn try_from(value: ParsedPacket) -> Result { + value.into_packet() + } +} + +impl TryFrom for Data { + type Error = Error; + + fn try_from(value: ParsedPacket) -> Result { + value.into_data() + } +} diff --git a/src/api/room_cmds.rs b/euphoxide/src/api/room_cmds.rs similarity index 88% rename from src/api/room_cmds.rs rename to euphoxide/src/api/room_cmds.rs index 0a2d553..f961af0 100644 --- a/src/api/room_cmds.rs +++ b/euphoxide/src/api/room_cmds.rs @@ -1,13 +1,17 @@ -//! Chat room commands. +//! Models [chat room commands][0] and their replies. //! //! These commands are available to the client once a session successfully joins //! a room. +//! +//! [0]: https://euphoria.leet.nu/heim/api#chat-room-commands use serde::{Deserialize, Serialize}; use super::{Message, MessageId, PmId, SessionId, SessionView, UserId}; /// Retrieve the full content of a single message in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GetMessage { /// The id of the message to retrieve. @@ -23,6 +27,8 @@ pub struct GetMessageReply(pub Message); /// This can be used to supplement the log provided by /// [`SnapshotEvent`](super::SnapshotEvent) (for example, when scrolling back /// further in history). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Log { /// Maximum number of messages to return (up to 1000). @@ -44,6 +50,8 @@ pub struct LogReply { /// /// This name applies to all messages sent during this session, until the nick /// command is called again. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Nick { /// The requested name (maximum length 36 bytes). @@ -68,6 +76,8 @@ pub struct NickReply { /// Constructs a virtual room for private messaging between the client and the /// given [`UserId`]. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PmInitiate { /// The id of the user to invite to chat privately. @@ -94,6 +104,8 @@ pub struct PmInitiateReply { /// The caller of this command will not receive the corresponding /// [`SendEvent`](super::SendEvent), but will receive the same information in /// the [`SendReply`]. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Send { /// The content of the message (client-defined). @@ -109,12 +121,14 @@ pub struct Send { pub struct SendReply(pub Message); /// Request a list of sessions currently joined in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Who {} /// Lists the sessions currently joined in the room. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WhoReply { - /// A list of session views. + /// A list of session views (including our session). pub listing: Vec, } diff --git a/src/api/session_cmds.rs b/euphoxide/src/api/session_cmds.rs similarity index 86% rename from src/api/session_cmds.rs rename to euphoxide/src/api/session_cmds.rs index 4dab0d4..e60a76c 100644 --- a/src/api/session_cmds.rs +++ b/euphoxide/src/api/session_cmds.rs @@ -1,7 +1,9 @@ -//! Session commands. +//! Models [session commands][0] and their replies. //! //! Session management commands are involved in the initial handshake and //! maintenance of a session. +//! +//! [0]: https://euphoria.leet.nu/heim/api#session-commands use serde::{Deserialize, Serialize}; @@ -11,6 +13,8 @@ use super::{AuthOption, Time}; /// /// This should be sent in response to a [`BounceEvent`](super::BounceEvent) at /// the beginning of a session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Auth { /// The method of authentication. @@ -32,6 +36,8 @@ pub struct AuthReply { /// /// The server will send back a [`PingReply`] with the same timestamp as soon as /// possible. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Ping { /// An arbitrary value, intended to be a unix timestamp. diff --git a/src/api/types.rs b/euphoxide/src/api/types.rs similarity index 91% rename from src/api/types.rs rename to euphoxide/src/api/types.rs index b1408a8..d7121f8 100644 --- a/src/api/types.rs +++ b/euphoxide/src/api/types.rs @@ -1,20 +1,16 @@ -//! Field types. +//! Models the [field types][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api#field-types -// TODO Add newtype wrappers for different kinds of IDs? - -// Serde's derive macros generate this warning and I can't turn it off locally, -// so I'm turning it off for the entire module. -#![allow(clippy::use_self)] - -use std::num::ParseIntError; -use std::str::FromStr; -use std::{error, fmt}; +use std::{error, fmt, num::ParseIntError, str::FromStr}; use jiff::Timestamp; use serde::{de, ser, Deserialize, Serialize}; use serde_json::Value; /// Describes an account and its preferred name. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountView { /// The id of the account. @@ -24,7 +20,9 @@ pub struct AccountView { } /// Mode of authentication. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +/// +/// +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum AuthOption { /// Authentication with a passcode, where a key is derived from the passcode @@ -36,6 +34,8 @@ pub enum AuthOption { /// /// It corresponds to a chat message, or a post, or any broadcasted event in a /// room that should appear in the log. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { /// The id of the message (unique within a room). @@ -72,6 +72,8 @@ pub struct Message { /// The type of a packet. /// /// Not all of these types have their corresponding data modeled as a struct. +/// +/// #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum PacketType { @@ -250,6 +252,8 @@ impl fmt::Display for PacketType { } /// Describes an account to its owner. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersonalAccountView { /// The id of the account. @@ -261,6 +265,8 @@ pub struct PersonalAccountView { } /// Describes a session and its identity. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SessionView { /// The id of an agent or account (or bot). @@ -290,6 +296,8 @@ pub struct SessionView { /// A 13-character string, usually used as aunique identifier for some type of object. /// /// It is the base-36 encoding of an unsigned, 64-bit integer. +/// +/// #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct Snowflake(pub u64); @@ -307,7 +315,7 @@ impl Snowflake { /// representation of message ids to suddenly use the upper parts of the /// range, and since message ids mostly consist of a timestamp, this /// approach should last until at least 2075. - pub const MAX: Self = Snowflake(i64::MAX as u64); + pub const MAX: Self = Self(i64::MAX as u64); } impl fmt::Display for Snowflake { @@ -324,6 +332,7 @@ impl fmt::Display for Snowflake { } } +/// An error that occurred while parsing a [`Snowflake`]. #[derive(Debug)] pub enum ParseSnowflakeError { InvalidLength(usize), @@ -365,7 +374,7 @@ impl FromStr for Snowflake { return Err(ParseSnowflakeError::InvalidLength(s.len())); } let n = u64::from_str_radix(s, 36)?; - Ok(Snowflake(n)) + Ok(Self(n)) } } @@ -402,6 +411,8 @@ impl<'de> Deserialize<'de> for Snowflake { /// Time is specified as a signed 64-bit integer, giving the number of seconds /// since the Unix Epoch. +/// +/// #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct Time(pub i64); @@ -426,6 +437,8 @@ impl Time { /// /// It is possible for this value to have no prefix and colon, and there is no /// fixed format for the unique value. +/// +/// #[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct UserId(pub String); @@ -435,21 +448,27 @@ impl fmt::Display for UserId { } } +/// What kind of user a [`UserId`] is. #[derive(Debug, PartialEq, Eq)] -pub enum SessionType { +pub enum UserType { Agent, Account, Bot, } impl UserId { - pub fn session_type(&self) -> Option { + /// Retrieve the [`UserType`] of this user. + /// + /// This method can return [`None`] because user IDs used to have no + /// associated type. Such user IDs can still occur in old room logs, so + /// euphoxide supports them. + pub fn user_type(&self) -> Option { if self.0.starts_with("agent:") { - Some(SessionType::Agent) + Some(UserType::Agent) } else if self.0.starts_with("account:") { - Some(SessionType::Account) + Some(UserType::Account) } else if self.0.starts_with("bot:") { - Some(SessionType::Bot) + Some(UserType::Bot) } else { None } diff --git a/euphoxide/src/client.rs b/euphoxide/src/client.rs new file mode 100644 index 0000000..7ed3db4 --- /dev/null +++ b/euphoxide/src/client.rs @@ -0,0 +1,4 @@ +//! A connection from a client's perspective. + +pub mod conn; +pub mod state; diff --git a/euphoxide/src/client/conn.rs b/euphoxide/src/client/conn.rs new file mode 100644 index 0000000..a178b4c --- /dev/null +++ b/euphoxide/src/client/conn.rs @@ -0,0 +1,342 @@ +//! Client-specific connection with a more expressive API. + +use std::{future::Future, time::Duration}; + +use log::debug; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + client::IntoClientRequest, + http::{header, HeaderValue}, +}; + +use crate::{ + api::{Command, Data, LoginReply, ParsedPacket}, + conn::{Conn, ConnConfig, Side}, + replies::{self, PendingReply, Replies}, + Error, Result, +}; + +use super::state::State; + +enum ConnCommand { + SendCmd(Data, oneshot::Sender>>), + GetState(oneshot::Sender), +} + +/// Configuration options for a [`ClientConn`]. +#[derive(Debug, Clone)] +pub struct ClientConnConfig { + /// The domain where the server is hosted. + pub domain: String, + /// Whether the client should present itself as a human to the server. + /// + /// This should only be set if the client is directly acting on behalf of a + /// human, similar to the web client. + pub human: bool, + /// The size of the [`mpsc::channel`] for communication between + /// [`ClientConn`] and [`ClientConnHandle`]. + pub channel_bufsize: usize, + /// Timeout for opening a websocket connection. + pub connect_timeout: Duration, + /// Timeout for server replies when sending euphoria commands, i.e. packets + /// implementing [`Command`]. + pub command_timeout: Duration, + /// How long to wait in-between sending pings. + /// + /// See also [`ConnConfig::ping_interval`]. + pub ping_interval: Duration, +} + +impl Default for ClientConnConfig { + fn default() -> Self { + Self { + domain: "euphoria.leet.nu".to_string(), + human: false, + channel_bufsize: 1, + connect_timeout: Duration::from_secs(10), + command_timeout: Duration::from_secs(30), + ping_interval: Duration::from_secs(30), + } + } +} + +/// A client connection to an euphoria server. +/// +/// This struct is a wrapper around [`Conn`] with a more client-centric API. It +/// tracks the connection state, including room information sent by the server. +/// It also provides [`ClientConnHandle`], which can be used to asynchronously +/// send commands and await their replies. +pub struct ClientConn { + rx: mpsc::Receiver, + tx: mpsc::Sender, + + conn: Conn, + state: State, + + next_id: usize, + replies: Replies, +} + +impl ClientConn { + /// Retrieve the current [`State`] of the connection. + pub fn state(&self) -> &State { + &self.state + } + + /// Create a new handle for this connection. + pub fn handle(&self) -> ClientConnHandle { + ClientConnHandle { + tx: self.tx.clone(), + } + } + + /// Start closing the connection. + /// + /// To finish closing the connection gracefully, continue calling + /// [`Self::recv`] until it returns [`None`]. + pub async fn close(&mut self) -> Result<()> { + self.conn.close().await + } + + /// Receive a [`ParsedPacket`] over the connection. + /// + /// This method also maintains the connection by listening and responding to + /// pings as well as managing [`ClientConnHandle`]s. Thus, it must be called + /// regularly. + /// + /// Returns [`None`] if the connection is closed. + pub async fn recv(&mut self) -> Result> { + loop { + self.replies.purge(); + + // There's always at least one tx end (self.tx), so self.rx.recv() + // should never return None. + let packet = select! { + packet = self.conn.recv() => packet?, + Some(cmd) = self.rx.recv() => { + self.on_cmd(cmd).await; + continue; + }, + }; + + if let Some(packet) = &packet { + self.on_packet(packet).await?; + } + + break Ok(packet); + } + } + + /// Send a packet over the connection. + /// + /// A packet id is automatically generated and returned. When the server + /// replies to the packet, it will use this id as its [`ParsedPacket::id`]. + pub async fn send(&mut self, data: impl Into) -> Result { + let id = self.next_id.to_string(); + self.next_id += 1; + + self.conn + .send(ParsedPacket::from_data(Some(id.clone()), data.into())) + .await?; + + Ok(id) + } + + async fn on_packet(&mut self, packet: &ParsedPacket) -> Result<()> { + if let Ok(data) = &packet.content { + self.state.on_data(data); + + // The euphoria server doesn't always disconnect the client when it + // would make sense to do so or when the API specifies it should. + // This ensures we always disconnect when it makes sense to do so. + if matches!( + data, + Data::DisconnectEvent(_) + | Data::LoginEvent(_) + | Data::LogoutEvent(_) + | Data::LoginReply(LoginReply { success: true, .. }) + | Data::LogoutReply(_) + ) { + self.close().await?; + } + } + + if let Some(id) = &packet.id { + let id = id.clone(); + self.replies.complete(&id, packet.clone()); + } + + Ok(()) + } + + async fn on_cmd(&mut self, cmd: ConnCommand) { + match cmd { + ConnCommand::SendCmd(data, sender) => { + let result = self.send(data).await.map(|id| self.replies.wait_for(id)); + let _ = sender.send(result); + } + ConnCommand::GetState(sender) => { + let _ = sender.send(self.state.clone()); + } + } + } + + /// Connect to a room. + /// + /// See [`Self::connect_with_config`] for more details. + pub async fn connect( + room: &str, + cookies: Option, + ) -> Result<(Self, Vec)> { + Self::connect_with_config(room, cookies, &ClientConnConfig::default()).await + } + + /// Connect to a room with a specific configuration. + /// + /// Cookies to be sent to the server can be specified as a [`HeaderValue`] + /// in the format of a [`Cookie` request header][0]. If the connection + /// attempt was successful, cookies set by the server will be returned + /// alongside the connection itself as one [`HeaderValue`] per [`Set-Cookie` + /// response header][1]. + /// + /// The tasks of cookie parsing and storage are not handled by this library. + /// + /// [0]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cookie + /// [1]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Set-Cookie + pub async fn connect_with_config( + room: &str, + cookies: Option, + config: &ClientConnConfig, + ) -> Result<(Self, Vec)> { + // Prepare URL + let human = if config.human { "?h=1" } else { "" }; + let uri = format!("wss://{}/room/{room}/ws{human}", config.domain); + debug!("Connecting to {uri} with cookies: {cookies:?}"); + + // Prepare request + let mut request = uri.into_client_request().expect("valid request"); + if let Some(cookies) = cookies { + request.headers_mut().append(header::COOKIE, cookies); + } + + // Connect to server + let (ws, response) = tokio::time::timeout( + config.connect_timeout, + tokio_tungstenite::connect_async(request), + ) + .await + .map_err(|_| Error::ConnectionTimeout)??; + + // Extract response cookies + let (mut parts, _) = response.into_parts(); + let cookies_set = match parts.headers.entry(header::SET_COOKIE) { + header::Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(), + header::Entry::Vacant(_) => vec![], + }; + debug!("Received cookies {cookies_set:?}"); + + // Prepare EuphConn + let conn_config = ConnConfig { + ping_interval: config.ping_interval, + }; + let conn = Conn::wrap_with_config(ws, Side::Client, conn_config); + + // Prepare client + let (tx, rx) = mpsc::channel(config.channel_bufsize); + let client = Self { + rx, + tx, + conn, + state: State::new(), + next_id: 0, + replies: Replies::new(config.command_timeout), + }; + + Ok((client, cookies_set)) + } +} + +/// Asynchronous access to a [`ClientConn`]. +/// +/// Handle methods are only processed while [`ClientConn::recv`] is being +/// called. They may return before they were processed by the associated +/// [`ClientConn`], or they may block until processed. Methods are processed in +/// the order they are called. +/// +/// The handle is cheap to clone. +#[derive(Debug, Clone)] +pub struct ClientConnHandle { + tx: mpsc::Sender, +} + +impl ClientConnHandle { + /// Send a command to the server. + /// + /// When awaited, returns either an error if something went wrong while + /// sending the command, or a second future with the server's reply (the + /// *reply future*). + /// + /// When awaited, the *reply future* returns either an error if something + /// was wrong with the reply, or the data returned by the server. The *reply + /// future* can be safely ignored and doesn't have to be awaited. + pub async fn send(&self, cmd: C) -> Result>> + where + C: Command + Into, + C::Reply: TryFrom, + { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::SendCmd(cmd.into(), tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + Ok(async { + let data = rx + .await + .map_err(|_| Error::ConnectionClosed)?? + .get() + .await + .map_err(|err| match err { + replies::Error::TimedOut => Error::CommandTimeout, + replies::Error::Canceled => Error::ConnectionClosed, + })? + .content + .map_err(Error::Euph)?; + + let ptype = data.packet_type(); + data.try_into() + .map_err(|_| Error::ReceivedUnexpectedPacket(ptype)) + }) + } + + /// Send a command to the server without waiting for a reply. + /// + /// This method is equivalent to calling and awaiting [`Self::send`] but + /// ignoring the *reply future*. The reason it exists is that clippy gets + /// really annoying when you try to ignore a future (which is usually the + /// right call). + pub async fn send_only(&self, cmd: C) -> Result<()> + where + C: Command + Into, + C::Reply: TryFrom, + { + let _ignore = self.send(cmd).await?; + Ok(()) + } + + /// Retrieve the current connection [`State`]. + pub async fn state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::GetState(tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + rx.await.map_err(|_| Error::ConnectionClosed) + } +} diff --git a/euphoxide/src/client/state.rs b/euphoxide/src/client/state.rs new file mode 100644 index 0000000..a90edde --- /dev/null +++ b/euphoxide/src/client/state.rs @@ -0,0 +1,307 @@ +//! Models the client's connection state. + +use std::collections::HashMap; + +use jiff::Timestamp; +use log::debug; + +use crate::api::{ + BounceEvent, Data, HelloEvent, NickEvent, PersonalAccountView, SessionId, SessionView, + SnapshotEvent, UserId, +}; + +/// Information about a session in the room. +/// +/// For quite a while before finally going down altogether, the euphoria.io +/// instance had an unreliable nick list: Listings returned by the server were +/// usually incomplete. Because of this, the bot library uses any observable +/// action by a session (including nick changes) to update the listing. Since +/// nick events don't include full session info though, the [`SessionInfo`] enum +/// can contain partial information. +/// +/// This level of paranioa probably isn't required any more now that the only +/// euphoria instance is working correctly. However, the code already works and +/// users who don't want to worry about it can just ignore partial session +/// infos. +#[derive(Debug, Clone)] +pub enum SessionInfo { + Full(SessionView), + Partial(NickEvent), +} + +impl SessionInfo { + /// Retrieve the user id of the session. + pub fn id(&self) -> &UserId { + match self { + Self::Full(sess) => &sess.id, + Self::Partial(nick) => &nick.id, + } + } + + /// Retrieve the session id of the session. + pub fn session_id(&self) -> &SessionId { + match self { + Self::Full(sess) => &sess.session_id, + Self::Partial(nick) => &nick.session_id, + } + } + + /// Retrieve the user name of the session. + pub fn name(&self) -> &str { + match self { + Self::Full(sess) => &sess.name, + Self::Partial(nick) => &nick.to, + } + } +} + +impl From for SessionInfo { + fn from(value: SessionView) -> Self { + Self::Full(value) + } +} + +impl From for SessionInfo { + fn from(value: NickEvent) -> Self { + Self::Partial(value) + } +} + +/// The state of the connection before the client has joined the room. +/// +/// Depending on the room, the client may need to authenticate or log in in +/// order to join. +#[derive(Debug, Clone)] +pub struct Joining { + /// Since when the connection has been in this state. + pub since: Timestamp, + /// A [`HelloEvent`], if one has been received. + /// + /// Contains information about the client's own session. + pub hello: Option, + /// A [`SnapshotEvent`], if one has been received. + pub snapshot: Option, + /// A [`BounceEvent`], if one has been received. + pub bounce: Option, +} + +impl Joining { + fn new() -> Self { + Self { + since: Timestamp::now(), + hello: None, + snapshot: None, + bounce: None, + } + } + + fn on_data(&mut self, data: &Data) { + match data { + Data::BounceEvent(p) => self.bounce = Some(p.clone()), + Data::HelloEvent(p) => self.hello = Some(p.clone()), + Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), + _ => {} + } + } + + fn to_joined(&self) -> Option { + let hello = self.hello.as_ref()?; + let snapshot = self.snapshot.as_ref()?; + + let mut session = hello.session.clone(); + + if let Some(nick) = &snapshot.nick { + session.name = nick.clone(); + } + + let listing = snapshot + .listing + .iter() + .cloned() + .map(|s| (s.session_id.clone(), SessionInfo::Full(s))) + .collect::>(); + + Some(Joined { + since: Timestamp::now(), + session, + account: hello.account.clone(), + listing, + }) + } +} + +/// The state of the connection after the client has successfully joined the +/// room. +/// +/// The client may need to set a nick in order to be able to send messages. +/// However, it can access the room history without nick. +#[derive(Debug, Clone)] +pub struct Joined { + /// Since when the connection has been in this state. + pub since: Timestamp, + /// The client's own session. + pub session: SessionView, + /// Account information, if the client is logged in. + pub account: Option, + /// All sessions currently connected to the room (except the client's own + /// session). + pub listing: HashMap, +} + +impl Joined { + fn on_data(&mut self, data: &Data) { + match data { + Data::JoinEvent(p) => { + debug!("Updating listing after join-event"); + self.listing + .insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone())); + } + Data::PartEvent(p) => { + debug!("Updating listing after part-event"); + self.listing.remove(&p.0.session_id); + } + Data::NetworkEvent(p) => { + if p.r#type == "partition" { + debug!("Updating listing after network-event with type partition"); + self.listing.retain(|_, s| match s { + SessionInfo::Full(s) => { + s.server_id != p.server_id && s.server_era != p.server_era + } + // We can't know if the session was disconnected by the + // partition or not, so we're erring on the side of + // caution and assuming they were kicked. If we're + // wrong, we'll re-add the session as soon as it + // performs another visible action. + // + // If we always kept such sessions, we might keep + // disconnected ones indefinitely, thereby keeping them + // from moving on, instead forever tethering them to the + // digital realm. + SessionInfo::Partial(_) => false, + }); + } + } + Data::SendEvent(p) => { + debug!("Updating listing after send-event"); + self.listing.insert( + p.0.sender.session_id.clone(), + SessionInfo::Full(p.0.sender.clone()), + ); + } + Data::NickEvent(p) => { + debug!("Updating listing after nick-event"); + self.listing + .entry(p.session_id.clone()) + .and_modify(|s| match s { + SessionInfo::Full(session) => session.name = p.to.clone(), + SessionInfo::Partial(_) => *s = SessionInfo::Partial(p.clone()), + }) + .or_insert_with(|| SessionInfo::Partial(p.clone())); + } + Data::NickReply(p) => { + debug!("Updating own session after nick-reply"); + assert_eq!(self.session.id, p.id); + self.session.name = p.to.clone(); + } + Data::WhoReply(p) => { + debug!("Updating listing after who-reply"); + self.listing.clear(); + for session in p.listing.clone() { + if session.session_id == self.session.session_id { + self.session = session; + } else { + self.listing + .insert(session.session_id.clone(), session.into()); + } + } + } + _ => {} + } + } +} + +/// The state of a connection to the server, from a client's perspective. +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum State { + /// The client has not joined the room yet. + Joining(Joining), + /// The client has successfully joined the room. + Joined(Joined), +} + +impl State { + /// Create a new state for a fresh connection. + /// + /// Assumes that no packets have been received yet. See also + /// [`Self::on_data`]. + pub fn new() -> Self { + Joining::new().into() + } + + /// If the state consists of a [`Joining`], return a reference to it. + pub fn as_joining(&self) -> Option<&Joining> { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + /// If the state consists of a [`Joined`], return a reference to it. + pub fn as_joined(&self) -> Option<&Joined> { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } + + /// If the state consists of a [`Joining`], return it. + pub fn into_joining(self) -> Option { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + /// If the state consists of a [`Joined`], return it. + pub fn into_joined(self) -> Option { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } + + /// Update the state with a packet received from the server. + /// + /// This method should be called whenever any packet is received from the + /// server. Skipping packets may cause the state to become inconsistent. + pub fn on_data(&mut self, data: &Data) { + match self { + Self::Joining(joining) => { + joining.on_data(data); + if let Some(joined) = joining.to_joined() { + *self = joined.into(); + } + } + Self::Joined(joined) => joined.on_data(data), + } + } +} + +impl Default for State { + fn default() -> Self { + Self::new() + } +} + +impl From for State { + fn from(value: Joining) -> Self { + Self::Joining(value) + } +} + +impl From for State { + fn from(value: Joined) -> Self { + Self::Joined(value) + } +} diff --git a/euphoxide/src/conn.rs b/euphoxide/src/conn.rs new file mode 100644 index 0000000..513711a --- /dev/null +++ b/euphoxide/src/conn.rs @@ -0,0 +1,312 @@ +//! Basic connection between client and server. + +use std::{fmt, time::Duration}; + +use futures_util::{SinkExt, StreamExt}; +use jiff::Timestamp; +use log::debug; +use tokio::{ + net::TcpStream, + select, + time::{self, Instant}, +}; +use tokio_tungstenite::{ + tungstenite::{client::IntoClientRequest, handshake::client::Response, Message}, + MaybeTlsStream, WebSocketStream, +}; + +use crate::{ + api::{Data, Packet, PacketType, ParsedPacket, Ping, PingEvent, PingReply, Time}, + Error, Result, +}; + +/// Which side of the connection we're on. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Side { + /// We're the client and are talking to a server. + Client, + /// We're the server and are talking to a client. + Server, +} + +/// Configuration options for a [`Conn`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConnConfig { + /// How long to wait in-between sending pings. + /// + /// This includes both websocket and euphoria pings ([`Ping`] or + /// [`PingEvent`]). + pub ping_interval: Duration, +} + +impl Default for ConnConfig { + fn default() -> Self { + Self { + ping_interval: Duration::from_secs(30), + } + } +} + +/// A basic connection between a client and a server. +/// +/// The connection can be used both from a server's and from a client's +/// perspective. In both cases, it performs regular websocket *and* euphoria +/// pings and terminates the connection if the other side does not reply before +/// the next ping is sent. +pub struct Conn { + ws: WebSocketStream>, + side: Side, + config: ConnConfig, + + // The websocket server may send a pong frame with arbitrary payload + // unprompted at any time (see RFC 6455 5.5.3). Because of this, we can't + // just remember the last pong payload. + last_ping: Instant, + last_ws_ping_payload: Option>, + last_ws_ping_replied_to: bool, + last_euph_ping_payload: Option