diff --git a/.vscode/settings.json b/.vscode/settings.json index 7a89179..4ab04ef 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,8 +1,8 @@ { - "files.insertFinalNewline": true, - "rust-analyzer.cargo.features": "all", - "rust-analyzer.imports.granularity.enforce": true, - "rust-analyzer.imports.granularity.group": "module", - "rust-analyzer.imports.group.enable": true, - "evenBetterToml.formatter.columnWidth": 100, + "files.insertFinalNewline": true, + "rust-analyzer.cargo.features": "all", + "rust-analyzer.imports.granularity.enforce": true, + "rust-analyzer.imports.granularity.group": "crate", + "rust-analyzer.imports.group.enable": true, + "evenBetterToml.formatter.columnWidth": 100 } diff --git a/Cargo.toml b/Cargo.toml index 0ae6832..8487a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,48 +1,34 @@ -[package] -name = "euphoxide" +[workspace] +resolver = "2" +members = ["euphoxide", "euphoxide-bot", "euphoxide-client"] + +[workspace.package] version = "0.5.1" edition = "2021" +rust-version = "1.82" -[features] -bot = ["dep:async-trait", "dep:clap", "dep:cookie"] - -[dependencies] -async-trait = { version = "0.1.83", optional = true } +[workspace.dependencies] +async-trait = "0.1.83" caseless = "0.2.1" -cookie = { version = "0.18.1", optional = true } -futures-util = { version = "0.3.31", default-features = false, features = ["sink"] } -jiff = { version = "0.1.15", features = ["serde"] } +clap = { version = "4.5.23", default-features = false, features = ["std"] } +cookie = "0.18.1" +futures-util = "0.3.31" +jiff = { version = "0.1.15", default-features = false, features = ["std"] } log = "0.4.22" -serde = { version = "1.0.215", features = ["derive"] } +serde = "1.0.215" serde_json = "1.0.133" -tokio = { version = "1.42.0", features = ["time", "sync", "macros", "rt"] } -tokio-stream = "0.1.16" -tokio-tungstenite = { version = "0.24.0", features = ["rustls-tls-native-roots"] } +tokio = "1.42.0" +tokio-tungstenite = "0.24.0" unicode-normalization = "0.1.24" - -[dependencies.clap] -version = "4.5.22" -optional = true -default-features = false -features = ["std", "derive", "deprecated"] - -[dev-dependencies] # For example bot +# For examples +anyhow = "1.0.94" rustls = "0.23.19" -tokio = { version = "1.42.0", features = ["rt-multi-thread"] } +# In this workspace +euphoxide = { path = "./euphoxide" } +euphoxide-bot = { path = "./euphoxide-bot" } +euphoxide-client = { path = "./euphoxide-client" } -[[example]] -name = "testbot_instance" -required-features = ["bot"] - -[[example]] -name = "testbot_instances" -required-features = ["bot"] - -[[example]] -name = "testbot_commands" -required-features = ["bot"] - -[lints] +[workspace.lints] rust.unsafe_code = { level = "forbid", priority = 1 } # Lint groups rust.deprecated_safe = "warn" diff --git a/euphoxide-bot/Cargo.toml b/euphoxide-bot/Cargo.toml new file mode 100644 index 0000000..4ae61bf --- /dev/null +++ b/euphoxide-bot/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "euphoxide-bot" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } + +[features] +clap = ["dep:clap"] + +[dependencies] +async-trait = { workspace = true } +clap = { workspace = true, optional = true, features = ["derive"] } +cookie = { workspace = true } +euphoxide = { workspace = true } +euphoxide-client = { workspace = true } +jiff = { workspace = true } +log = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +tokio-tungstenite = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } + +[lints] +workspace = true diff --git a/euphoxide-bot/examples/examplebot.rs b/euphoxide-bot/examples/examplebot.rs new file mode 100644 index 0000000..4f6d02c --- /dev/null +++ b/euphoxide-bot/examples/examplebot.rs @@ -0,0 +1,93 @@ +use std::time::Duration; + +use euphoxide::api::Message; +use euphoxide_bot::{ + basic::FromHandler, + botrulez::{FullHelp, Ping, ShortHelp}, + clap::FromClapHandler, + CommandExt, Commands, Context, Propagate, +}; +use euphoxide_client::MultiClient; +use log::error; +use tokio::sync::mpsc; + +async fn pyramid(_arg: &str, msg: &Message, ctx: &Context) -> euphoxide::Result { + let mut parent = msg.id; + + for _ in 0..3 { + let first = ctx.reply(parent, "brick").await?; + ctx.reply_only(parent, "brick").await?; + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + ctx.reply_only(parent, "brick").await?; + Ok(Propagate::No) +} + +#[derive(clap::Parser)] +struct AddArgs { + lhs: i64, + rhs: i64, +} + +async fn add(args: AddArgs, msg: &Message, ctx: &Context) -> euphoxide::Result { + let result = args.lhs + args.rhs; + + ctx.reply_only(msg.id, format!("{} + {} = {result}", args.lhs, args.rhs)) + .await?; + + Ok(Propagate::No) +} + +#[tokio::main] +async fn main() { + let (event_tx, mut event_rx) = mpsc::channel(10); + + let commands = Commands::::new() + .then(Ping::default().general("ping").with_info_hidden()) + .then(Ping::default().specific("ping").with_info_hidden()) + .then( + ShortHelp::new("/me demonstrates how to use euphoxide") + .general("help") + .with_info_hidden(), + ) + .then( + FullHelp::new() + .with_after("Created using euphoxide.") + .specific("help") + .with_info_hidden(), + ) + .then( + FromHandler::new(pyramid) + .with_info() + .with_description("build a pyramid") + .general("pyramid"), + ) + .then( + FromClapHandler::new(add) + .clap() + .with_info() + .with_description("add two numbers") + .general("add"), + ) + .build(); + + let clients = MultiClient::new(event_tx); + + clients + .client_builder("test") + .with_username("examplebot") + .build_and_add() + .await; + + while let Some(event) = event_rx.recv().await { + let commands = commands.clone(); + let clients = clients.clone(); + tokio::task::spawn(async move { + if let Err(err) = commands.handle_event(clients, event).await { + error!("Oops: {err}") + } + }); + } +} diff --git a/euphoxide-bot/src/command.rs b/euphoxide-bot/src/command.rs new file mode 100644 index 0000000..c2a04dd --- /dev/null +++ b/euphoxide-bot/src/command.rs @@ -0,0 +1,257 @@ +pub mod bang; +pub mod basic; +pub mod botrulez; +#[cfg(feature = "clap")] +pub mod clap; + +use std::{future::Future, sync::Arc}; + +use async_trait::async_trait; +use euphoxide::{ + api::{self, Data, Message, MessageId, SendEvent, SendReply}, + client::{ + conn::ClientConnHandle, + state::{Joined, State}, + }, +}; +use euphoxide_client::{Client, MultiClient, MultiClientEvent}; + +use self::{ + bang::{General, Global, Specific}, + basic::{Prefixed, WithInfo}, +}; + +#[non_exhaustive] +pub struct Context { + pub commands: Arc>, + pub clients: MultiClient, + pub client: Client, + pub conn: ClientConnHandle, + pub joined: Joined, +} + +impl Context { + pub async fn send( + &self, + content: impl ToString, + ) -> euphoxide::Result>> { + self.conn + .send(api::Send { + content: content.to_string(), + parent: None, + }) + .await + } + + pub async fn send_only(&self, content: impl ToString) -> euphoxide::Result<()> { + let _ignore = self.send(content).await?; + Ok(()) + } + + pub async fn reply( + &self, + parent: MessageId, + content: impl ToString, + ) -> euphoxide::Result>> { + self.conn + .send(api::Send { + content: content.to_string(), + parent: Some(parent), + }) + .await + } + + pub async fn reply_only( + &self, + parent: MessageId, + content: impl ToString, + ) -> euphoxide::Result<()> { + let _ignore = self.reply(parent, content).await?; + Ok(()) + } +} + +#[derive(Default)] +pub struct Info { + pub trigger: Option, + pub description: Option, +} + +impl Info { + pub fn new() -> Self { + Self::default() + } + + pub fn with_trigger(mut self, trigger: impl ToString) -> Self { + self.trigger = Some(trigger.to_string()); + self + } + + pub fn with_description(mut self, description: impl ToString) -> Self { + self.description = Some(description.to_string()); + self + } + + pub fn prepend_trigger(&mut self, trigger: impl ToString) { + // TODO Use get_or_instert_default when updating MSRV + let cur_trigger = self.trigger.get_or_insert_with(String::new); + if !cur_trigger.is_empty() { + cur_trigger.insert(0, ' '); + } + cur_trigger.insert_str(0, &trigger.to_string()); + } + + pub fn with_prepended_trigger(mut self, trigger: impl ToString) -> Self { + self.prepend_trigger(trigger); + self + } +} + +/// Whether a message should propagate to subsequent commands. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Propagate { + No, + Yes, +} + +#[allow(unused_variables)] +#[async_trait] +pub trait Command { + fn info(&self, ctx: &Context) -> Info { + Info::default() + } + + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result; +} + +pub trait CommandExt: Sized { + fn with_info(self) -> WithInfo { + WithInfo::new(self) + } + + fn with_info_hidden(self) -> WithInfo { + WithInfo::hidden(self) + } + + fn prefixed(self, prefix: impl ToString) -> Prefixed { + Prefixed::new(prefix, self) + } + + fn global(self, name: impl ToString) -> Global { + Global::new(name, self) + } + + fn general(self, name: impl ToString) -> General { + General::new(name, self) + } + + fn specific(self, name: impl ToString) -> Specific { + Specific::new(name, self) + } + + #[cfg(feature = "clap")] + fn clap(self) -> clap::Clap { + clap::Clap(self) + } + + fn add_to(self, commands: &mut Commands) + where + Self: Command + Send + Sync + 'static, + { + commands.add(self); + } +} + +// Sadly this doesn't work: `impl> CommandExt for C {}` +// It leaves E unconstrained. Instead, we just implement CommandExt for all +// types. This is fine since it'll crash and burn once we try to use the created +// commands as actual commands. It also follows the spirit of adding trait +// constraints only where they are necessary. +impl CommandExt for C {} + +pub struct Commands { + commands: Vec + Sync + Send>>, +} + +impl Commands { + pub fn new() -> Self { + Self { commands: vec![] } + } + + pub fn add(&mut self, command: impl Command + Sync + Send + 'static) { + self.commands.push(Box::new(command)); + } + + pub fn then(mut self, command: impl Command + Sync + Send + 'static) -> Self { + self.add(command); + self + } + + pub fn build(self) -> Arc { + Arc::new(self) + } + + pub fn infos(&self, ctx: &Context) -> Vec { + self.commands.iter().map(|c| c.info(ctx)).collect() + } + + pub async fn handle_message( + self: Arc, + clients: MultiClient, + client: Client, + conn: ClientConnHandle, + joined: Joined, + msg: &Message, + ) -> Result { + let ctx = Context { + commands: self.clone(), + clients, + client, + conn, + joined, + }; + + for command in &self.commands { + let propagate = command.execute(&msg.content, msg, &ctx).await?; + if propagate == Propagate::No { + return Ok(Propagate::No); + } + } + + Ok(Propagate::Yes) + } + + pub async fn handle_event( + self: Arc, + clients: MultiClient, + event: MultiClientEvent, + ) -> Result { + let MultiClientEvent::Packet { + client, + conn, + state, + packet, + } = event + else { + return Ok(Propagate::Yes); + }; + + let Ok(Data::SendEvent(SendEvent(msg))) = &packet.content else { + return Ok(Propagate::Yes); + }; + + let State::Joined(joined) = state else { + return Ok(Propagate::Yes); + }; + + self.handle_message(clients, client, conn, joined, msg) + .await + } +} + +// Has fewer restrictions on generic types than #[derive(Default)]. +impl Default for Commands { + fn default() -> Self { + Self::new() + } +} diff --git a/src/bot/command/bang.rs b/euphoxide-bot/src/command/bang.rs similarity index 55% rename from src/bot/command/bang.rs rename to euphoxide-bot/src/command/bang.rs index a55d99d..1d433f5 100644 --- a/src/bot/command/bang.rs +++ b/euphoxide-bot/src/command/bang.rs @@ -1,9 +1,9 @@ +//! Euphoria-style `!foo` and `!foo @bar` command wrappers. + use async_trait::async_trait; +use euphoxide::{api::Message, nick}; -use crate::api::Message; -use crate::nick; - -use super::{Command, Context}; +use super::{Command, Context, Info, Propagate}; // TODO Don't ignore leading whitespace? // I'm not entirely happy with how commands handle whitespace, and on euphoria, @@ -24,9 +24,9 @@ pub fn parse_prefix_initiated<'a>(text: &'a str, prefix: &str) -> Option<(&'a st } pub struct Global { - prefix: String, - name: String, - inner: C, + pub prefix: String, + pub name: String, + pub inner: C, } impl Global { @@ -38,48 +38,40 @@ impl Global { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } } #[async_trait] -impl Command for Global +impl Command for Global where - B: Send, - C: Command + Send + Sync, + C: Command + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; - Some(format!("{}{} - {inner}", self.prefix, self.name)) + fn info(&self, ctx: &Context) -> Info { + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{}", self.prefix, self.name)) } - async fn execute( - &self, - arg: &str, - msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } - self.inner.execute(rest, msg, ctx, bot).await + self.inner.execute(rest, msg, ctx).await } } pub struct General { - prefix: String, - name: String, - inner: C, + pub prefix: String, + pub name: String, + pub inner: C, } impl General { @@ -91,55 +83,47 @@ impl General { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } } #[async_trait] -impl Command for General +impl Command for General where - B: Send, - C: Command + Send + Sync, + C: Command + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; - Some(format!("{}{} - {inner}", self.prefix, self.name)) + fn info(&self, ctx: &Context) -> Info { + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{}", self.prefix, self.name)) } - async fn execute( - &self, - arg: &str, - msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } if parse_prefix_initiated(rest, "@").is_some() { // The command looks like a specific command. If we treated it like // a general command match, we would interpret other bots' specific // commands as general commands. - return Ok(false); + return Ok(Propagate::Yes); } - self.inner.execute(rest, msg, ctx, bot).await + self.inner.execute(rest, msg, ctx).await } } pub struct Specific { - prefix: String, - name: String, - inner: C, + pub prefix: String, + pub name: String, + pub inner: C, } impl Specific { @@ -151,52 +135,42 @@ impl Specific { } } - pub fn prefix(mut self, prefix: S) -> Self { + pub fn with_prefix(mut self, prefix: S) -> Self { self.prefix = prefix.to_string(); self } } #[async_trait] -impl Command for Specific +impl Command for Specific where - B: Send, - C: Command + Send + Sync, + C: Command + Sync, { - fn description(&self, ctx: &Context) -> Option { - let inner = self.inner.description(ctx)?; + fn info(&self, ctx: &Context) -> Info { let nick = nick::mention(&ctx.joined.session.name); - Some(format!("{}{} @{nick} - {inner}", self.prefix, self.name)) + self.inner + .info(ctx) + .with_prepended_trigger(format!("{}{} @{nick}", self.prefix, self.name)) } - async fn execute( - &self, - arg: &str, - msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { - // TODO Replace with let-else - let (name, rest) = match parse_prefix_initiated(arg, &self.prefix) { - Some(parsed) => parsed, - None => return Ok(false), + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + let Some((name, rest)) = parse_prefix_initiated(arg, &self.prefix) else { + return Ok(Propagate::Yes); }; if name != self.name { - return Ok(false); + return Ok(Propagate::Yes); } - // TODO Replace with let-else - let (nick, rest) = match parse_prefix_initiated(rest, "@") { - Some(parsed) => parsed, - None => return Ok(false), + let Some((nick, rest)) = parse_prefix_initiated(rest, "@") else { + return Ok(Propagate::Yes); }; if nick::normalize(nick) != nick::normalize(&ctx.joined.session.name) { - return Ok(false); + return Ok(Propagate::Yes); } - self.inner.execute(rest, msg, ctx, bot).await + self.inner.execute(rest, msg, ctx).await } } diff --git a/euphoxide-bot/src/command/basic.rs b/euphoxide-bot/src/command/basic.rs new file mode 100644 index 0000000..c60f2cf --- /dev/null +++ b/euphoxide-bot/src/command/basic.rs @@ -0,0 +1,141 @@ +//! Basic command wrappers. + +use std::future::Future; + +use async_trait::async_trait; +use euphoxide::api::Message; + +use super::{Command, Context, Info, Propagate}; + +/// Rewrite or hide command info. +pub struct WithInfo { + pub inner: C, + pub trigger: Option>, + pub description: Option>, +} + +impl WithInfo { + pub fn new(inner: C) -> Self { + Self { + inner, + trigger: None, + description: None, + } + } + + pub fn hidden(inner: C) -> Self { + Self::new(inner) + .with_trigger_hidden() + .with_description_hidden() + } + + pub fn with_trigger(mut self, trigger: impl ToString) -> Self { + self.trigger = Some(Some(trigger.to_string())); + self + } + + pub fn with_trigger_hidden(mut self) -> Self { + self.trigger = Some(None); + self + } + + pub fn with_description(mut self, description: impl ToString) -> Self { + self.description = Some(Some(description.to_string())); + self + } + + pub fn with_description_hidden(mut self) -> Self { + self.description = Some(None); + self + } +} + +#[async_trait] +impl Command for WithInfo +where + C: Command + Sync, +{ + fn info(&self, ctx: &Context) -> Info { + let info = self.inner.info(ctx); + Info { + trigger: self.trigger.clone().unwrap_or(info.trigger), + description: self.description.clone().unwrap_or(info.description), + } + } + + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + self.inner.execute(arg, msg, ctx).await + } +} + +pub struct Prefixed { + pub prefix: String, + pub inner: C, +} + +impl Prefixed { + pub fn new(prefix: S, inner: C) -> Self { + Self { + prefix: prefix.to_string(), + inner, + } + } +} + +#[async_trait] +impl Command for Prefixed +where + C: Command + Sync, +{ + fn info(&self, ctx: &Context) -> Info { + self.inner.info(ctx).with_prepended_trigger(&self.prefix) + } + + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + if let Some(rest) = arg.trim_start().strip_prefix(&self.prefix) { + self.inner.execute(rest, msg, ctx).await + } else { + Ok(Propagate::Yes) + } + } +} + +// Black type magic, thanks a lot to https://github.com/kpreid and the +// async_fn_traits crate! + +// TODO Simplify all this once AsyncFn becomes stable + +pub trait HandlerFn<'a0, 'a1, 'a2, E>: + Fn(&'a0 str, &'a1 Message, &'a2 Context) -> Self::Future +where + E: 'a2, +{ + type Future: Future> + Send; +} + +impl<'a0, 'a1, 'a2, E, F, Fut> HandlerFn<'a0, 'a1, 'a2, E> for F +where + E: 'a2, + F: Fn(&'a0 str, &'a1 Message, &'a2 Context) -> Fut + ?Sized, + Fut: Future> + Send, +{ + type Future = Fut; +} + +pub struct FromHandler(pub F); + +impl FromHandler { + pub fn new(f: F) -> Self { + Self(f) + } +} + +#[async_trait] +impl Command for FromHandler +where + F: for<'a0, 'a1, 'a2> HandlerFn<'a0, 'a1, 'a2, E> + Sync, +{ + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + (self.0)(arg, msg, ctx).await + } +} diff --git a/euphoxide-bot/src/command/botrulez.rs b/euphoxide-bot/src/command/botrulez.rs new file mode 100644 index 0000000..94e7875 --- /dev/null +++ b/euphoxide-bot/src/command/botrulez.rs @@ -0,0 +1,8 @@ +//! The main [botrulez](https://github.com/jedevc/botrulez) commands. + +mod full_help; +mod ping; +mod short_help; +mod uptime; + +pub use self::{full_help::*, ping::*, short_help::*, uptime::*}; diff --git a/euphoxide-bot/src/command/botrulez/full_help.rs b/euphoxide-bot/src/command/botrulez/full_help.rs new file mode 100644 index 0000000..aee9ac4 --- /dev/null +++ b/euphoxide-bot/src/command/botrulez/full_help.rs @@ -0,0 +1,98 @@ +use async_trait::async_trait; +#[cfg(feature = "clap")] +use clap::Parser; +use euphoxide::api::Message; + +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; + +#[derive(Default)] +pub struct FullHelp { + pub before: String, + pub after: String, +} + +impl FullHelp { + pub fn new() -> Self { + Self::default() + } + + pub fn with_before(mut self, before: impl ToString) -> Self { + self.before = before.to_string(); + self + } + + pub fn with_after(mut self, after: impl ToString) -> Self { + self.after = after.to_string(); + self + } + + fn formulate_reply(&self, ctx: &Context) -> String { + let mut result = String::new(); + + if !self.before.is_empty() { + result.push_str(&self.before); + result.push('\n'); + } + + for info in ctx.commands.infos(ctx) { + if let Some(trigger) = &info.trigger { + result.push_str(trigger); + if let Some(description) = &info.description { + result.push_str(" - "); + result.push_str(description); + } + result.push('\n'); + } + } + + if !self.after.is_empty() { + result.push_str(&self.after); + result.push('\n'); + } + + result + } +} + +#[async_trait] +impl Command for FullHelp +where + E: From, +{ + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + if arg.trim().is_empty() { + let reply = self.formulate_reply(ctx); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) + } else { + Ok(Propagate::Yes) + } + } +} + +/// Show full bot help. +#[cfg(feature = "clap")] +#[derive(Parser)] +pub struct FullHelpArgs {} + +#[cfg(feature = "clap")] +#[async_trait] +impl ClapCommand for FullHelp +where + E: From, +{ + type Args = FullHelpArgs; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + ) -> Result { + let reply = self.formulate_reply(ctx); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) + } +} diff --git a/euphoxide-bot/src/command/botrulez/ping.rs b/euphoxide-bot/src/command/botrulez/ping.rs new file mode 100644 index 0000000..6511565 --- /dev/null +++ b/euphoxide-bot/src/command/botrulez/ping.rs @@ -0,0 +1,61 @@ +use async_trait::async_trait; +#[cfg(feature = "clap")] +use clap::Parser; +use euphoxide::api::Message; + +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; + +pub struct Ping(pub String); + +impl Ping { + pub fn new(reply: S) -> Self { + Self(reply.to_string()) + } +} + +impl Default for Ping { + fn default() -> Self { + Self::new("Pong!") + } +} + +#[async_trait] +impl Command for Ping +where + E: From, +{ + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + if arg.trim().is_empty() { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) + } else { + Ok(Propagate::Yes) + } + } +} + +/// Trigger a short reply. +#[cfg(feature = "clap")] +#[derive(Parser)] +pub struct PingArgs {} + +#[cfg(feature = "clap")] +#[async_trait] +impl ClapCommand for Ping +where + E: From, +{ + type Args = PingArgs; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + ) -> Result { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) + } +} diff --git a/euphoxide-bot/src/command/botrulez/short_help.rs b/euphoxide-bot/src/command/botrulez/short_help.rs new file mode 100644 index 0000000..8b1c9b8 --- /dev/null +++ b/euphoxide-bot/src/command/botrulez/short_help.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; +#[cfg(feature = "clap")] +use clap::Parser; +use euphoxide::api::Message; + +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; + +pub struct ShortHelp(pub String); + +impl ShortHelp { + pub fn new(text: S) -> Self { + Self(text.to_string()) + } +} + +#[async_trait] +impl Command for ShortHelp +where + E: From, +{ + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { + if arg.trim().is_empty() { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) + } else { + Ok(Propagate::Yes) + } + } +} + +/// Show short bot help. +#[cfg(feature = "clap")] +#[derive(Parser)] +pub struct ShortHelpArgs {} + +#[cfg(feature = "clap")] +#[async_trait] +impl ClapCommand for ShortHelp +where + E: From, +{ + type Args = ShortHelpArgs; + + async fn execute( + &self, + _args: Self::Args, + msg: &Message, + ctx: &Context, + ) -> Result { + ctx.reply_only(msg.id, &self.0).await?; + Ok(Propagate::No) + } +} diff --git a/src/bot/botrulez/uptime.rs b/euphoxide-bot/src/command/botrulez/uptime.rs similarity index 60% rename from src/bot/botrulez/uptime.rs rename to euphoxide-bot/src/command/botrulez/uptime.rs index d8b1d0d..40f6cc6 100644 --- a/src/bot/botrulez/uptime.rs +++ b/euphoxide-bot/src/command/botrulez/uptime.rs @@ -1,10 +1,12 @@ use async_trait::async_trait; +#[cfg(feature = "clap")] use clap::Parser; +use euphoxide::api::Message; use jiff::{Span, Timestamp, Unit}; -use crate::api::Message; -use crate::bot::command::{ClapCommand, Command, Context}; -use crate::conn; +#[cfg(feature = "clap")] +use crate::command::clap::ClapCommand; +use crate::command::{Command, Context, Propagate}; pub fn format_time(t: Timestamp) -> String { t.strftime("%Y-%m-%d %H:%M:%S UTC").to_string() @@ -57,8 +59,8 @@ pub trait HasStartTime { } impl Uptime { - fn formulate_reply(&self, ctx: &Context, bot: &B, connected: bool) -> String { - let start = bot.start_time(); + fn formulate_reply(&self, ctx: &Context, joined: bool, connected: bool) -> String { + let start = ctx.clients.start_time(); let now = Timestamp::now(); let mut reply = format!( @@ -67,6 +69,15 @@ impl Uptime { format_relative_time(start - now), ); + if joined { + let since = ctx.client.start_time(); + reply.push_str(&format!( + ", present since {} ({})", + format_time(since), + format_relative_time(since - now), + )); + } + if connected { let since = ctx.joined.since; reply.push_str(&format!( @@ -81,53 +92,49 @@ impl Uptime { } #[async_trait] -impl Command for Uptime +impl Command for Uptime where - B: HasStartTime + Send, - E: From, + E: From, { - async fn execute( - &self, - arg: &str, - msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { if arg.trim().is_empty() { - let reply = self.formulate_reply(ctx, bot, false); - ctx.reply(msg.id, reply).await?; - Ok(true) + let reply = self.formulate_reply(ctx, false, false); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) } else { - Ok(false) + Ok(Propagate::Yes) } } } /// Show how long the bot has been online. +#[cfg(feature = "clap")] #[derive(Parser)] -pub struct Args { +pub struct UptimeArgs { + /// Show how long the bot has been in this room. + #[arg(long, short)] + pub present: bool, /// Show how long the bot has been connected without interruption. #[arg(long, short)] pub connected: bool, } +#[cfg(feature = "clap")] #[async_trait] -impl ClapCommand for Uptime +impl ClapCommand for Uptime where - B: HasStartTime + Send, - E: From, + E: From, { - type Args = Args; + type Args = UptimeArgs; async fn execute( &self, args: Self::Args, msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { - let reply = self.formulate_reply(ctx, bot, args.connected); - ctx.reply(msg.id, reply).await?; - Ok(true) + ctx: &Context, + ) -> Result { + let reply = self.formulate_reply(ctx, args.present, args.connected); + ctx.reply_only(msg.id, reply).await?; + Ok(Propagate::No) } } diff --git a/src/bot/command/clap.rs b/euphoxide-bot/src/command/clap.rs similarity index 66% rename from src/bot/command/clap.rs rename to euphoxide-bot/src/command/clap.rs index a22b49a..c0bc71b 100644 --- a/src/bot/command/clap.rs +++ b/euphoxide-bot/src/command/clap.rs @@ -1,22 +1,23 @@ +//! [`clap`]-based commands. + +use std::{future::Future, marker::PhantomData}; + use async_trait::async_trait; use clap::{CommandFactory, Parser}; +use euphoxide::api::Message; -use crate::api::Message; -use crate::conn; - -use super::{Command, Context}; +use super::{Command, Context, Info, Propagate}; #[async_trait] -pub trait ClapCommand { +pub trait ClapCommand { type Args; async fn execute( &self, args: Self::Args, msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result; + ctx: &Context, + ) -> Result; } /// Parse bash-like quoted arguments separated by whitespace. @@ -99,29 +100,25 @@ fn parse_quoted_args(text: &str) -> Result, &'static str> { pub struct Clap(pub C); #[async_trait] -impl Command for Clap +impl Command for Clap where - B: Send, - E: From, - C: ClapCommand + Send + Sync, + E: From, + C: ClapCommand + Sync, C::Args: Parser + Send, { - fn description(&self, _ctx: &Context) -> Option { - C::Args::command().get_about().map(|s| format!("{s}")) + fn info(&self, _ctx: &Context) -> Info { + Info { + description: C::Args::command().get_about().map(|s| s.to_string()), + ..Info::default() + } } - async fn execute( - &self, - arg: &str, - msg: &Message, - ctx: &Context, - bot: &mut B, - ) -> Result { + async fn execute(&self, arg: &str, msg: &Message, ctx: &Context) -> Result { let mut args = match parse_quoted_args(arg) { Ok(args) => args, Err(err) => { - ctx.reply(msg.id, err).await?; - return Ok(true); + ctx.reply_only(msg.id, err).await?; + return Ok(Propagate::No); } }; @@ -132,12 +129,70 @@ where let args = match C::Args::try_parse_from(args) { Ok(args) => args, Err(err) => { - ctx.reply(msg.id, format!("{}", err.render())).await?; - return Ok(true); + ctx.reply_only(msg.id, format!("{}", err.render())).await?; + return Ok(Propagate::No); } }; - self.0.execute(args, msg, ctx, bot).await + self.0.execute(args, msg, ctx).await + } +} + +// TODO Simplify all this once AsyncFn becomes stable + +pub trait ClapHandlerFn<'a0, 'a1, A, E>: + Fn(A, &'a0 Message, &'a1 Context) -> Self::Future +where + E: 'a1, +{ + type Future: Future> + Send; +} + +impl<'a0, 'a1, A, E, F, Fut> ClapHandlerFn<'a0, 'a1, A, E> for F +where + E: 'a1, + F: Fn(A, &'a0 Message, &'a1 Context) -> Fut + ?Sized, + Fut: Future> + Send, +{ + type Future = Fut; +} + +pub struct FromClapHandler { + _a: PhantomData, + pub handler: F, +} + +impl FromClapHandler { + // Artificially constrained so we don't accidentally choose an incorrect A. + // Relying on type inference of A can result in unknown type errors even + // though we know what A should be based on F. + pub fn new<'a0, 'a1, E, Fut>(handler: F) -> Self + where + F: Fn(A, &'a0 Message, &'a1 Context) -> Fut, + E: 'a1, + { + Self { + _a: PhantomData, + handler, + } + } +} + +#[async_trait] +impl ClapCommand for FromClapHandler +where + F: for<'a0, 'a1> ClapHandlerFn<'a0, 'a1, A, E> + Sync, + A: Send + Sync + 'static, +{ + type Args = A; + + async fn execute( + &self, + args: Self::Args, + msg: &Message, + ctx: &Context, + ) -> Result { + (self.handler)(args, msg, ctx).await } } diff --git a/euphoxide-bot/src/lib.rs b/euphoxide-bot/src/lib.rs new file mode 100644 index 0000000..b76141f --- /dev/null +++ b/euphoxide-bot/src/lib.rs @@ -0,0 +1,3 @@ +mod command; + +pub use self::command::*; diff --git a/euphoxide-client/Cargo.toml b/euphoxide-client/Cargo.toml new file mode 100644 index 0000000..f476e1d --- /dev/null +++ b/euphoxide-client/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "euphoxide-client" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +cookie = { workspace = true } +euphoxide = { workspace = true } +jiff = { workspace = true } +log = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +tokio-tungstenite = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } + +[lints] +workspace = true diff --git a/euphoxide-client/examples/examplebot_multi.rs b/euphoxide-client/examples/examplebot_multi.rs new file mode 100644 index 0000000..a382333 --- /dev/null +++ b/euphoxide-client/examples/examplebot_multi.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::ClientConnHandle, +}; +use euphoxide_client::{MultiClient, MultiClientEvent}; +use tokio::sync::mpsc; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let (event_tx, mut event_rx) = mpsc::channel(10); + + // Don't drop the client or it will stop running. + let clients = MultiClient::new(event_tx); + + clients + .client_builder("test") + .with_username("examplebot") + .build_and_add() + .await; + + while let Some(event) = event_rx.recv().await { + if let MultiClientEvent::Packet { conn, packet, .. } = event { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn, data)); + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide-client/examples/examplebot_single.rs b/euphoxide-client/examples/examplebot_single.rs new file mode 100644 index 0000000..cccb701 --- /dev/null +++ b/euphoxide-client/examples/examplebot_single.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::ClientConnHandle, +}; +use euphoxide_client::{Client, ClientEvent}; +use tokio::sync::mpsc; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let (event_tx, mut event_rx) = mpsc::channel(10); + + // Don't drop the client or it will stop running. + let _client = Client::builder("test") + .with_username("examplebot") + .build(0, event_tx); + + while let Some(event) = event_rx.recv().await { + if let ClientEvent::Packet { conn, packet, .. } = event { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn, data)); + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide-client/src/builder.rs b/euphoxide-client/src/builder.rs new file mode 100644 index 0000000..5146f83 --- /dev/null +++ b/euphoxide-client/src/builder.rs @@ -0,0 +1,40 @@ +use crate::ClientConfig; + +pub trait ClientBuilderBase<'a> { + type Base; +} + +pub struct ClientBuilder<'a, B: ClientBuilderBase<'a>> { + pub(crate) base: B::Base, + pub(crate) config: ClientConfig, +} + +impl<'a, B: ClientBuilderBase<'a>> ClientBuilder<'a, B> { + pub fn config(&self) -> &ClientConfig { + &self.config + } + + pub fn config_mut(&mut self) -> &mut ClientConfig { + &mut self.config + } + + pub fn with_human(mut self, human: bool) -> Self { + self.config.human = human; + self + } + + pub fn with_username(mut self, username: impl ToString) -> Self { + self.config.username = Some(username.to_string()); + self + } + + pub fn with_force_username(mut self, force_username: bool) -> Self { + self.config.force_username = force_username; + self + } + + pub fn with_password(mut self, password: impl ToString) -> Self { + self.config.password = Some(password.to_string()); + self + } +} diff --git a/euphoxide-client/src/config.rs b/euphoxide-client/src/config.rs new file mode 100644 index 0000000..ca1baea --- /dev/null +++ b/euphoxide-client/src/config.rs @@ -0,0 +1,71 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use cookie::CookieJar; +use euphoxide::client::conn::ClientConnConfig; + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServerConfig { + pub client: ClientConnConfig, + pub cookies: Arc>, + pub join_attempts: usize, + pub reconnect_delay: Duration, + pub cmd_channel_bufsize: usize, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + client: ClientConnConfig::default(), + cookies: Arc::new(Mutex::new(CookieJar::new())), + join_attempts: 5, + reconnect_delay: Duration::from_secs(30), + cmd_channel_bufsize: 1, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ClientConfig { + pub server: ServerConfig, + pub room: String, + pub human: bool, + pub username: Option, + pub force_username: bool, + pub password: Option, +} + +impl ClientConfig { + pub fn new(server: ServerConfig, room: String) -> Self { + Self { + server, + room, + human: false, + username: None, + force_username: false, + password: None, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct MultiClientConfig { + pub server: ServerConfig, + pub cmd_channel_bufsize: usize, + pub event_channel_bufsize: usize, +} + +impl Default for MultiClientConfig { + fn default() -> Self { + Self { + server: ServerConfig::default(), + cmd_channel_bufsize: 1, + event_channel_bufsize: 10, + } + } +} diff --git a/euphoxide-client/src/lib.rs b/euphoxide-client/src/lib.rs new file mode 100644 index 0000000..375d6cc --- /dev/null +++ b/euphoxide-client/src/lib.rs @@ -0,0 +1,6 @@ +mod builder; +mod config; +mod multi; +mod single; + +pub use self::{builder::*, config::*, multi::*, single::*}; diff --git a/euphoxide-client/src/multi.rs b/euphoxide-client/src/multi.rs new file mode 100644 index 0000000..dbdbc6c --- /dev/null +++ b/euphoxide-client/src/multi.rs @@ -0,0 +1,244 @@ +use std::{collections::HashMap, sync::Arc}; + +use euphoxide::{ + api::ParsedPacket, + client::{conn::ClientConnHandle, state::State}, +}; +use jiff::Timestamp; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; + +use crate::{ + Client, ClientBuilder, ClientBuilderBase, ClientConfig, ClientEvent, MultiClientConfig, +}; + +#[derive(Debug)] +pub enum MultiClientEvent { + Started { + client: Client, + }, + Connecting { + client: Client, + }, + Connected { + client: Client, + conn: ClientConnHandle, + state: State, + }, + Joined { + client: Client, + conn: ClientConnHandle, + state: State, + }, + Packet { + client: Client, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + client: Client, + }, + Stopped { + client: Client, + }, +} + +impl MultiClientEvent { + fn from_client_event(client: Client, event: ClientEvent) -> Self { + match event { + ClientEvent::Started { id: _ } => Self::Started { client }, + ClientEvent::Connecting { id: _ } => Self::Connecting { client }, + ClientEvent::Connected { id: _, conn, state } => Self::Connected { + client, + conn, + state, + }, + ClientEvent::Joined { id: _, conn, state } => Self::Joined { + client, + conn, + state, + }, + ClientEvent::Packet { + id: _, + conn, + state, + packet, + } => Self::Packet { + client, + conn, + state, + packet, + }, + ClientEvent::Disconnected { id: _ } => Self::Disconnected { client }, + ClientEvent::Stopped { id: _ } => Self::Stopped { client }, + } + } + + pub fn client(&self) -> &Client { + match self { + Self::Started { client } => client, + Self::Connecting { client, .. } => client, + Self::Connected { client, .. } => client, + Self::Joined { client, .. } => client, + Self::Packet { client, .. } => client, + Self::Disconnected { client } => client, + Self::Stopped { client } => client, + } + } +} + +#[allow(clippy::large_enum_variant)] +enum Command { + GetClients(oneshot::Sender>), + AddClient(ClientConfig, oneshot::Sender), +} + +struct MultiClientTask { + next_id: usize, + clients: HashMap, + + cmd_rx: mpsc::Receiver, + event_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + out_tx: mpsc::Sender, +} + +impl MultiClientTask { + fn purge_clients(&mut self) { + self.clients.retain(|_, v| !v.stopped()); + } + + async fn on_event(&self, event: ClientEvent) { + if let Some(client) = self.clients.get(&event.id()) { + let event = MultiClientEvent::from_client_event(client.clone(), event); + let _ = self.out_tx.send(event).await; + } + } + + async fn on_cmd(&mut self, cmd: Command) { + match cmd { + Command::GetClients(tx) => { + self.purge_clients(); // Not necessary for correctness + let _ = tx.send(self.clients.values().cloned().collect()); + } + Command::AddClient(config, tx) => { + let id = self.next_id; + assert!(!self.clients.contains_key(&id)); + self.next_id += 1; + + let client = Client::new(id, config, self.event_tx.clone()); + self.clients.insert(id, client.clone()); + + let _ = tx.send(client); + } + } + } + + async fn run(mut self) { + loop { + // Prevent potential memory leak + self.purge_clients(); + + let received = select! { + r = self.event_rx.recv() => Ok(r), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + Ok(None) => break, + Ok(Some(event)) => self.on_event(event).await, + Err(None) => break, + Err(Some(cmd)) => self.on_cmd(cmd).await, + } + } + } +} + +#[derive(Clone)] +pub struct MultiClient { + config: Arc, + cmd_tx: mpsc::Sender, + start_time: Timestamp, +} + +impl MultiClient { + pub fn new(event_tx: mpsc::Sender) -> Self { + Self::new_with_config(MultiClientConfig::default(), event_tx) + } + + pub fn new_with_config( + config: MultiClientConfig, + event_tx: mpsc::Sender, + ) -> Self { + let start_time = Timestamp::now(); + + let config = Arc::new(config); + let out_tx = event_tx; + + let (cmd_tx, cmd_rx) = mpsc::channel(config.cmd_channel_bufsize); + let (event_tx, event_rx) = mpsc::channel(config.event_channel_bufsize); + + let task = MultiClientTask { + next_id: 0, + clients: HashMap::new(), + cmd_rx, + event_rx, + event_tx, + out_tx, + }; + + tokio::task::spawn(task.run()); + + Self { + config, + cmd_tx, + start_time, + } + } + + pub fn config(&self) -> &MultiClientConfig { + &self.config + } + + pub fn start_time(&self) -> Timestamp { + self.start_time + } + + pub async fn get_clients(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetClients(tx)).await; + rx.await.expect("task should still be running") + } + + pub async fn add_client(&self, config: ClientConfig) -> Client { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::AddClient(config, tx)).await; + rx.await.expect("task should still be running") + } +} + +///////////// +// Builder // +///////////// + +impl<'a> ClientBuilderBase<'a> for MultiClient { + type Base = &'a Self; +} + +impl MultiClient { + pub fn client_builder(&self, room: impl ToString) -> ClientBuilder<'_, Self> { + ClientBuilder { + base: self, + config: ClientConfig::new(self.config.server.clone(), room.to_string()), + } + } +} + +impl ClientBuilder<'_, MultiClient> { + pub async fn build_and_add(self) -> Client { + self.base.add_client(self.config).await + } +} diff --git a/euphoxide-client/src/single.rs b/euphoxide-client/src/single.rs new file mode 100644 index 0000000..c2a3cc4 --- /dev/null +++ b/euphoxide-client/src/single.rs @@ -0,0 +1,435 @@ +use std::{fmt, result, str::FromStr, sync::Arc}; + +use cookie::Cookie; +use euphoxide::{ + api::{Auth, AuthOption, BounceEvent, Data, Nick, ParsedPacket}, + client::{ + conn::{ClientConn, ClientConnHandle}, + state::State, + }, +}; +use jiff::Timestamp; +use log::warn; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + self, + http::{HeaderValue, StatusCode}, +}; + +use crate::{ClientBuilder, ClientBuilderBase, ClientConfig, ServerConfig}; + +enum Error { + Stopped, + NoReferences, + AuthRequired, + InvalidPassword, + OutOfJoinAttempts, + Euphoxide(euphoxide::Error), +} + +impl Error { + fn is_fatal(&self) -> bool { + match self { + Self::Stopped => true, + Self::NoReferences => true, + Self::AuthRequired => true, + Self::InvalidPassword => true, + Self::OutOfJoinAttempts => true, + Self::Euphoxide(euphoxide::Error::Tungstenite(tungstenite::Error::Http(response))) => { + response.status() == StatusCode::NOT_FOUND + } + _ => false, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Stopped => write!(f, "the instance was stopped manually"), + Self::NoReferences => write!(f, "all references to the instance were dropped"), + Self::AuthRequired => write!(f, "authentication required but no credentials found"), + Self::InvalidPassword => write!(f, "authentication required but password is invalid"), + Self::OutOfJoinAttempts => write!(f, "failed to join within attempt limit"), + Self::Euphoxide(error) => write!(f, "{error}"), + } + } +} + +impl From for Error { + fn from(value: euphoxide::Error) -> Self { + Self::Euphoxide(value) + } +} + +type Result = result::Result; + +enum Command { + GetConn(oneshot::Sender), + Stop, +} + +#[derive(Debug)] +pub enum ClientEvent { + Started { + id: usize, + }, + Connecting { + id: usize, + }, + Connected { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Joined { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Packet { + id: usize, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + id: usize, + }, + Stopped { + id: usize, + }, +} + +impl ClientEvent { + pub fn id(&self) -> usize { + match self { + Self::Started { id } => *id, + Self::Connecting { id } => *id, + Self::Connected { id, .. } => *id, + Self::Joined { id, .. } => *id, + Self::Packet { id, .. } => *id, + Self::Disconnected { id } => *id, + Self::Stopped { id } => *id, + } + } +} + +struct ClientTask { + id: usize, + config: Arc, + + cmd_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + + attempts: usize, + never_joined: bool, +} + +impl ClientTask { + fn get_cookies(&self) -> Option { + self.config + .server + .cookies + .lock() + .unwrap() + .iter() + .map(|c| c.stripped().to_string()) + .collect::>() + .join("; ") + .try_into() + .ok() + } + + fn set_cookies(&mut self, cookies: &[HeaderValue]) { + let mut guard = self.config.server.cookies.lock().unwrap(); + for cookie in cookies { + if let Ok(cookie) = cookie.to_str() { + if let Ok(cookie) = Cookie::from_str(cookie) { + guard.add(cookie); + } + } + } + } + + async fn connect(&mut self) -> Result { + let (conn, cookies) = ClientConn::connect_with_config( + &self.config.room, + self.get_cookies(), + &self.config.server.client, + ) + .await?; + + self.set_cookies(&cookies); + + Ok(conn) + } + + async fn on_joined(&mut self, conn: &ClientConn) { + self.never_joined = false; + + let _ = self + .event_tx + .send(ClientEvent::Joined { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + } + + async fn on_packet(&mut self, conn: &mut ClientConn, packet: ParsedPacket) -> Result<()> { + let _ = self + .event_tx + .send(ClientEvent::Packet { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + packet: packet.clone(), + }) + .await; + + match packet.into_data()? { + // Attempting to authenticate + Data::BounceEvent(BounceEvent { + auth_options: Some(auth_options), + .. + }) if auth_options.contains(&AuthOption::Passcode) => { + if let Some(password) = &self.config.password { + conn.send(Auth { + r#type: AuthOption::Passcode, + passcode: Some(password.clone()), + }) + .await?; + } else { + return Err(Error::AuthRequired); + } + } + + // Auth attempt failed :( + Data::AuthReply(ev) if !ev.success => return Err(Error::InvalidPassword), + + // Just joined + Data::SnapshotEvent(ev) => { + if let Some(username) = &self.config.username { + if ev.nick.is_none() || self.config.force_username { + conn.send(Nick { + name: username.clone(), + }) + .await?; + } + } + + // Maybe we should only count this as joining if we successfully + // updated the nick instead of just sending a Nick command? And + // maybe we should ensure that we're in the State::Joined state? + // Both of these would probably complicate the code a lot. On + // the other hand, InstanceEvent::Joined::state would contain + // the actual nick after joining, which feels like the right + // thing to do™. Probably not worth the increase in code + // complexity though. + + self.on_joined(conn).await; + } + + _ => {} + } + + Ok(()) + } + + async fn on_cmd(&mut self, conn: &ClientConn, cmd: Command) -> Result<()> { + match cmd { + Command::GetConn(sender) => { + let _ = sender.send(conn.handle()); + Ok(()) + } + Command::Stop => Err(Error::Stopped), + } + } + + async fn run_once(&mut self) -> Result<()> { + // If we try to connect too many times without managing to join at least + // once, the room is probably not accessible for one reason or another + // and the instance should stop. + self.attempts += 1; + if self.never_joined && self.attempts > self.config.server.join_attempts { + return Err(Error::OutOfJoinAttempts); + } + + let _ = self + .event_tx + .send(ClientEvent::Connecting { id: self.id }) + .await; + + let mut conn = match self.connect().await { + Ok(conn) => conn, + Err(err) => { + // When we fail to connect, we want to wait a bit before + // reconnecting in order not to spam the server. However, when + // we are connected successfully and then disconnect for + // whatever reason, we want to try to reconnect immediately. We + // might, for example, be disconnected from the server because + // we just logged in. + tokio::time::sleep(self.config.server.reconnect_delay).await; + Err(err)? + } + }; + + let _ = self + .event_tx + .send(ClientEvent::Connected { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + + let result = loop { + let received = select! { + r = conn.recv() => Ok(r?), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + // We received a packet + Ok(None) => break Ok(()), // Connection closed + Ok(Some(packet)) => self.on_packet(&mut conn, packet).await?, + // We received a command + Err(None) => break Err(Error::NoReferences), + Err(Some(cmd)) => self.on_cmd(&conn, cmd).await?, + }; + }; + + let _ = self + .event_tx + .send(ClientEvent::Disconnected { id: self.id }) + .await; + + result + } + + async fn run(mut self) { + let _ = self + .event_tx + .send(ClientEvent::Started { id: self.id }) + .await; + + loop { + if let Err(err) = self.run_once().await { + warn!("instance {:?}: {err}", self.id); + if err.is_fatal() { + break; + } + } + } + + let _ = self + .event_tx + .send(ClientEvent::Stopped { id: self.id }) + .await; + } +} + +#[derive(Clone)] +pub struct Client { + id: usize, + config: Arc, + cmd_tx: mpsc::Sender, + start_time: Timestamp, +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Instance") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl Client { + pub fn new(id: usize, config: ClientConfig, event_tx: mpsc::Sender) -> Self { + let start_time = Timestamp::now(); + + let config = Arc::new(config); + + let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); + + let task = ClientTask { + id, + config: config.clone(), + attempts: 0, + never_joined: false, + cmd_rx, + event_tx, + }; + + tokio::task::spawn(task.run()); + + Self { + id, + config, + cmd_tx, + start_time, + } + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn config(&self) -> &ClientConfig { + &self.config + } + + pub fn start_time(&self) -> Timestamp { + self.start_time + } + + pub fn stopped(&self) -> bool { + self.cmd_tx.is_closed() + } + + pub async fn stop(&self) { + let _ = self.cmd_tx.send(Command::Stop).await; + } + + pub async fn handle(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetConn(tx)).await; + rx.await.ok() + } +} + +///////////// +// Builder // +///////////// + +impl ClientBuilderBase<'static> for Client { + type Base = (); +} + +impl Client { + pub fn builder(room: impl ToString) -> ClientBuilder<'static, Self> { + Self::builder_for_server(ServerConfig::default(), room) + } + + pub fn builder_for_server( + server: ServerConfig, + room: impl ToString, + ) -> ClientBuilder<'static, Self> { + ClientBuilder { + base: (), + config: ClientConfig::new(server, room.to_string()), + } + } +} + +impl ClientBuilder<'static, Client> { + pub fn build(self, id: usize, event_tx: mpsc::Sender) -> Client { + Client::new(id, self.config, event_tx) + } +} diff --git a/euphoxide/Cargo.toml b/euphoxide/Cargo.toml new file mode 100644 index 0000000..bd66d50 --- /dev/null +++ b/euphoxide/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "euphoxide" +edition = { workspace = true } +version = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +caseless = { workspace = true } +futures-util = { workspace = true } +jiff = { workspace = true } +log = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["macros", "sync", "time"] } +tokio-tungstenite = { workspace = true } +unicode-normalization = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +rustls = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } + +[lints] +workspace = true diff --git a/euphoxide/examples/examplebot.rs b/euphoxide/examples/examplebot.rs new file mode 100644 index 0000000..19fe95e --- /dev/null +++ b/euphoxide/examples/examplebot.rs @@ -0,0 +1,90 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::{ClientConn, ClientConnHandle}, +}; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let (mut conn, _) = ClientConn::connect("test", None).await?; + + while let Some(packet) = conn.recv().await? { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn.handle(), data)); + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide/src/api.rs b/euphoxide/src/api.rs new file mode 100644 index 0000000..23617f3 --- /dev/null +++ b/euphoxide/src/api.rs @@ -0,0 +1,12 @@ +//! Models the [euphoria.leet.nu API][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api + +pub mod account_cmds; +pub mod events; +pub mod packets; +pub mod room_cmds; +pub mod session_cmds; +pub mod types; + +pub use self::{account_cmds::*, events::*, packets::*, room_cmds::*, session_cmds::*, types::*}; diff --git a/src/api/account_cmds.rs b/euphoxide/src/api/account_cmds.rs similarity index 91% rename from src/api/account_cmds.rs rename to euphoxide/src/api/account_cmds.rs index 01ef7f0..b9ed570 100644 --- a/src/api/account_cmds.rs +++ b/euphoxide/src/api/account_cmds.rs @@ -1,8 +1,10 @@ -//! Account commands. +//! Models [account commands][0] and their replies. //! //! These commands enable a client to register, associate, and dissociate with //! an account. An account allows an identity to be shared across browsers and //! devices, and is a prerequisite for room management +//! +//! [0]: https://euphoria.leet.nu/heim/api#account-commands use serde::{Deserialize, Serialize}; @@ -11,6 +13,8 @@ use super::AccountId; /// Change the primary email address associated with the signed in account. /// /// The email address may need to be verified before the change is fully applied. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangeEmail { /// The new primary email address for the account. @@ -32,6 +36,8 @@ pub struct ChangeEmailReply { } /// Change the name associated with the signed in account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangeName { /// The name to associate with the account. @@ -46,6 +52,8 @@ pub struct ChangeNameReply { } /// Change the password of the signed in account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangePassword { /// The current (and soon-to-be former) password. @@ -65,6 +73,8 @@ pub struct ChangePasswordReply {} /// If the login succeeds, the client should expect to receive a /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged in session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Login { /// The namespace of a personal identifier. @@ -98,6 +108,8 @@ pub struct LoginReply { /// If the logout is successful, the client should expect to receive a /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged out session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Logout {} @@ -113,6 +125,8 @@ pub struct LogoutReply {} /// [`DisconnectEvent`](super::DisconnectEvent) shortly after. The next /// connection the client makes will be a logged in session using the new /// account. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegisterAccount { /// The namespace of a personal identifier. @@ -145,6 +159,8 @@ pub struct RegisterAccountReply { /// /// An error will be returned if the account has no unverified email addresses /// associated with it. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResendVerificationEmail {} @@ -156,6 +172,8 @@ pub struct ResendVerificationEmailReply {} /// /// An email will be sent to the owner of the given personal identifier, with /// instructions and a confirmation code for resetting the password. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResetPassword { pub namespace: String, diff --git a/src/api/events.rs b/euphoxide/src/api/events.rs similarity index 87% rename from src/api/events.rs rename to euphoxide/src/api/events.rs index 8abe04d..b46dcb7 100644 --- a/src/api/events.rs +++ b/euphoxide/src/api/events.rs @@ -1,4 +1,6 @@ -//! Asynchronous events. +//! Models [asynchronous events][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api#asynchronous-events use serde::{Deserialize, Serialize}; @@ -8,6 +10,8 @@ use super::{ }; /// Indicates that access to a room is denied. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BounceEvent { /// The reason why access was denied. @@ -25,16 +29,37 @@ pub struct BounceEvent { /// /// If the disconnect reason is `authentication changed`, the client should /// immediately reconnect. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DisconnectEvent { /// The reason for disconnection. pub reason: String, } +/// Indicates that a message in the room has been modified or deleted. +/// +/// If the client offers a user interface and the indicated message is currently +/// displayed, it should update its display accordingly. +/// +/// The event packet includes a snapshot of the message post-edit. +/// +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EditMessageEvent { + /// The id of the edit. + pub edit_id: Snowflake, + /// The snapshot of the message post-edit. + #[serde(flatten)] + pub message: Message, +} + /// Sent by the server to the client when a session is started. /// /// It includes information about the client's authentication and associated /// identity. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HelloEvent { /// The id of the agent or account logged into this session. @@ -55,11 +80,15 @@ pub struct HelloEvent { } /// Indicates a session just joined the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JoinEvent(pub SessionView); /// Sent to all sessions of an agent when that agent is logged in (except for /// the session that issued the login command). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LoginEvent { pub account_id: AccountId, @@ -67,6 +96,8 @@ pub struct LoginEvent { /// Sent to all sessions of an agent when that agent is logged out (except for /// the session that issued the logout command). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogoutEvent {} @@ -75,6 +106,8 @@ pub struct LogoutEvent {} /// /// If the network event type is `partition`, then this should be treated as a /// [`PartEvent`] for all sessions connected to the same server id/era combo. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NetworkEvent { /// The type of network event; for now, always `partition`. @@ -86,6 +119,8 @@ pub struct NetworkEvent { } /// Announces a nick change by another session in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NickEvent { /// The id of the session this name applies to. @@ -98,22 +133,9 @@ pub struct NickEvent { pub to: String, } -/// Indicates that a message in the room has been modified or deleted. -/// -/// If the client offers a user interface and the indicated message is currently -/// displayed, it should update its display accordingly. -/// -/// The event packet includes a snapshot of the message post-edit. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EditMessageEvent { - /// The id of the edit. - pub edit_id: Snowflake, - /// The snapshot of the message post-edit. - #[serde(flatten)] - pub message: Message, -} - /// Indicates a session just disconnected from the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartEvent(pub SessionView); @@ -121,6 +143,8 @@ pub struct PartEvent(pub SessionView); /// /// The client should send back a ping-reply with the same value for the time /// field as soon as possible (or risk disconnection). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PingEvent { /// A unix timestamp according to the server's clock. @@ -131,6 +155,8 @@ pub struct PingEvent { } /// Informs the client that another user wants to chat with them privately. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PmInitiateEvent { /// The id of the user inviting the client to chat privately. @@ -144,12 +170,16 @@ pub struct PmInitiateEvent { } /// Indicates a message received by the room from another session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SendEvent(pub Message); /// Indicates that a session has successfully joined a room. /// /// It also offers a snapshot of the room’s state and recent history. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SnapshotEvent { /// The id of the agent or account logged into this session. @@ -158,7 +188,7 @@ pub struct SnapshotEvent { pub session_id: SessionId, /// The server’s version identifier. pub version: String, - /// The list of all other sessions joined to the room (excluding this + /// The list of all other sessions joined to the room (excluding our /// session). pub listing: Vec, /// The most recent messages posted to the room (currently up to 100). diff --git a/euphoxide/src/api/packets.rs b/euphoxide/src/api/packets.rs new file mode 100644 index 0000000..6326a4c --- /dev/null +++ b/euphoxide/src/api/packets.rs @@ -0,0 +1,319 @@ +//! Models the [packets][0] sent between the server and client. +//! +//! [0]: https://euphoria.leet.nu/heim/api#packets + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::Error; + +use super::PacketType; + +/// A "raw" packet. +/// +/// This packet closely matches the [packet representation defined in the +/// API][0]. It can contain arbitrary data in the form of a JSON [`Value`]. It +/// can also contain both data and an error at the same time. +/// +/// In order to interpret this packet, you probably want to convert it to a +/// [`ParsedPacket`] using [`ParsedPacket::from_packet`]. +/// +/// [0]: https://euphoria.leet.nu/heim/api#packets +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Packet { + /// Client-generated id for associating replies with commands. + pub id: Option, + /// The type of the command, reply, or event. + pub r#type: PacketType, + /// The payload of the command, reply, or event. + pub data: Option, + /// This field appears in replies if a command fails. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + /// This field appears in replies to warn the client that it may be + /// flooding. + /// + /// The client should slow down its command rate. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub throttled: bool, + /// If throttled is true, this field describes why. + #[serde(skip_serializing_if = "Option::is_none")] + pub throttled_reason: Option, +} + +/// Models the relationship between command and reply types. +/// +/// This trait is useful for type-safe command-reply APIs. +pub trait Command { + /// The type of reply one can expect from the server when sending this + /// command. + type Reply; +} + +macro_rules! packets { + ( $( $mod:ident::$name:ident, )*) => { + /// A big enum containing most types of packet data. + #[derive(Debug, Clone)] + #[non_exhaustive] + pub enum Data { + $( $name(super::$mod::$name), )* + /// A valid type of packet data that this library does not model as + /// a struct. + Unimplemented(PacketType, Value), + } + + impl Data { + /// Interpret a JSON [`Value`] as packet data of a specific [`PacketType`]. + /// + /// This method may fail if the data is invalid. + pub fn from_value(ptype: PacketType, value: Value) -> serde_json::Result { + Ok(match ptype { + $( PacketType::$name => Self::$name(serde_json::from_value(value)?), )* + _ => Self::Unimplemented(ptype, value), + }) + } + + /// Convert the packet data into a JSON [`Value`]. + /// + /// This method may fail if the data fails to serialize. + pub fn into_value(self) -> serde_json::Result { + Ok(match self { + $( Self::$name(p) => serde_json::to_value(p)?, )* + Self::Unimplemented(_, value) => value, + }) + } + + /// The [`PacketType`] of this packet data. + pub fn packet_type(&self) -> PacketType { + match self { + $( Self::$name(_) => PacketType::$name, )* + Self::Unimplemented(ptype, _) => *ptype, + } + } + } + + $( + impl From for Data { + fn from(p: super::$mod::$name) -> Self { + Self::$name(p) + } + } + + impl TryFrom for super::$mod::$name{ + type Error = (); + + fn try_from(value: Data) -> Result { + match value { + Data::$name(p) => Ok(p), + _ => Err(()) + } + } + } + )* + }; +} + +macro_rules! commands { + ( $( $cmd:ident => $rpl:ident, )* ) => { + $( + impl Command for super::$cmd { + type Reply = super::$rpl; + } + )* + }; +} + +packets! { + // Events + events::BounceEvent, + events::DisconnectEvent, + events::EditMessageEvent, + events::HelloEvent, + events::JoinEvent, + events::LoginEvent, + events::LogoutEvent, + events::NetworkEvent, + events::NickEvent, + events::PartEvent, + events::PingEvent, + events::PmInitiateEvent, + events::SendEvent, + events::SnapshotEvent, + // Session commands + session_cmds::Auth, + session_cmds::AuthReply, + session_cmds::Ping, + session_cmds::PingReply, + // Chat room commands + room_cmds::GetMessage, + room_cmds::GetMessageReply, + room_cmds::Log, + room_cmds::LogReply, + room_cmds::Nick, + room_cmds::NickReply, + room_cmds::PmInitiate, + room_cmds::PmInitiateReply, + room_cmds::Send, + room_cmds::SendReply, + room_cmds::Who, + room_cmds::WhoReply, + // Account commands + account_cmds::ChangeEmail, + account_cmds::ChangeEmailReply, + account_cmds::ChangeName, + account_cmds::ChangeNameReply, + account_cmds::ChangePassword, + account_cmds::ChangePasswordReply, + account_cmds::Login, + account_cmds::LoginReply, + account_cmds::Logout, + account_cmds::LogoutReply, + account_cmds::RegisterAccount, + account_cmds::RegisterAccountReply, + account_cmds::ResendVerificationEmail, + account_cmds::ResendVerificationEmailReply, + account_cmds::ResetPassword, + account_cmds::ResetPasswordReply, +} + +commands! { + // Session commands + Auth => AuthReply, + Ping => PingReply, + // Chat room commands + GetMessage => GetMessageReply, + Log => LogReply, + Nick => NickReply, + PmInitiate => PmInitiateReply, + Send => SendReply, + Who => WhoReply, + // Account commands + ChangeEmail => ChangeEmailReply, + ChangeName => ChangeNameReply, + ChangePassword => ChangePasswordReply, + Login => LoginReply, + Logout => LogoutReply, + RegisterAccount => RegisterAccountReply, + ResendVerificationEmail => ResendVerificationEmailReply, + ResetPassword => ResetPasswordReply, +} + +/// A fully parsed and interpreted packet. +/// +/// Compared to [`Packet`], this packet's representation more closely matches +/// the actual use of packets. +#[derive(Debug, Clone)] +pub struct ParsedPacket { + /// Client-generated id for associating replies with commands. + pub id: Option, + /// The type of the command, reply, or event. + pub r#type: PacketType, + /// The payload of the command, reply, or event, or an error message if the + /// command failed. + pub content: Result, + /// A warning to the client that it may be flooding. + /// + /// The client should slow down its command rate. + pub throttled: Option, +} + +impl ParsedPacket { + /// Convert a [`Data`]-compatible value into a [`ParsedPacket`]. + pub fn from_data(id: Option, data: impl Into) -> Self { + let data = data.into(); + Self { + id, + r#type: data.packet_type(), + content: Ok(data), + throttled: None, + } + } + + pub fn into_data(self) -> crate::Result { + self.content.map_err(Error::Euph) + } + + /// Convert a [`Packet`] into a [`ParsedPacket`]. + /// + /// This method may fail if the packet data is invalid. + pub fn from_packet(packet: Packet) -> serde_json::Result { + let id = packet.id; + let r#type = packet.r#type; + + let content = if let Some(error) = packet.error { + Err(error) + } else { + let data = packet.data.unwrap_or_default(); + Ok(Data::from_value(r#type, data)?) + }; + + let throttled = if packet.throttled { + let reason = packet + .throttled_reason + .unwrap_or_else(|| "no reason given".to_string()); + Some(reason) + } else { + None + }; + + Ok(Self { + id, + r#type, + content, + throttled, + }) + } + + /// Convert a [`ParsedPacket`] into a [`Packet`]. + /// + /// This method may fail if the packet data fails to serialize. + pub fn into_packet(self) -> serde_json::Result { + let id = self.id; + let r#type = self.r#type; + let throttled = self.throttled.is_some(); + let throttled_reason = self.throttled; + + Ok(match self.content { + Ok(data) => Packet { + id, + r#type, + data: Some(data.into_value()?), + error: None, + throttled, + throttled_reason, + }, + Err(error) => Packet { + id, + r#type, + data: None, + error: Some(error), + throttled, + throttled_reason, + }, + }) + } +} + +impl TryFrom for ParsedPacket { + type Error = serde_json::Error; + + fn try_from(value: Packet) -> Result { + Self::from_packet(value) + } +} + +impl TryFrom for Packet { + type Error = serde_json::Error; + + fn try_from(value: ParsedPacket) -> Result { + value.into_packet() + } +} + +impl TryFrom for Data { + type Error = Error; + + fn try_from(value: ParsedPacket) -> Result { + value.into_data() + } +} diff --git a/src/api/room_cmds.rs b/euphoxide/src/api/room_cmds.rs similarity index 88% rename from src/api/room_cmds.rs rename to euphoxide/src/api/room_cmds.rs index 0a2d553..f961af0 100644 --- a/src/api/room_cmds.rs +++ b/euphoxide/src/api/room_cmds.rs @@ -1,13 +1,17 @@ -//! Chat room commands. +//! Models [chat room commands][0] and their replies. //! //! These commands are available to the client once a session successfully joins //! a room. +//! +//! [0]: https://euphoria.leet.nu/heim/api#chat-room-commands use serde::{Deserialize, Serialize}; use super::{Message, MessageId, PmId, SessionId, SessionView, UserId}; /// Retrieve the full content of a single message in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GetMessage { /// The id of the message to retrieve. @@ -23,6 +27,8 @@ pub struct GetMessageReply(pub Message); /// This can be used to supplement the log provided by /// [`SnapshotEvent`](super::SnapshotEvent) (for example, when scrolling back /// further in history). +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Log { /// Maximum number of messages to return (up to 1000). @@ -44,6 +50,8 @@ pub struct LogReply { /// /// This name applies to all messages sent during this session, until the nick /// command is called again. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Nick { /// The requested name (maximum length 36 bytes). @@ -68,6 +76,8 @@ pub struct NickReply { /// Constructs a virtual room for private messaging between the client and the /// given [`UserId`]. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PmInitiate { /// The id of the user to invite to chat privately. @@ -94,6 +104,8 @@ pub struct PmInitiateReply { /// The caller of this command will not receive the corresponding /// [`SendEvent`](super::SendEvent), but will receive the same information in /// the [`SendReply`]. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Send { /// The content of the message (client-defined). @@ -109,12 +121,14 @@ pub struct Send { pub struct SendReply(pub Message); /// Request a list of sessions currently joined in the room. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Who {} /// Lists the sessions currently joined in the room. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WhoReply { - /// A list of session views. + /// A list of session views (including our session). pub listing: Vec, } diff --git a/src/api/session_cmds.rs b/euphoxide/src/api/session_cmds.rs similarity index 86% rename from src/api/session_cmds.rs rename to euphoxide/src/api/session_cmds.rs index 4dab0d4..e60a76c 100644 --- a/src/api/session_cmds.rs +++ b/euphoxide/src/api/session_cmds.rs @@ -1,7 +1,9 @@ -//! Session commands. +//! Models [session commands][0] and their replies. //! //! Session management commands are involved in the initial handshake and //! maintenance of a session. +//! +//! [0]: https://euphoria.leet.nu/heim/api#session-commands use serde::{Deserialize, Serialize}; @@ -11,6 +13,8 @@ use super::{AuthOption, Time}; /// /// This should be sent in response to a [`BounceEvent`](super::BounceEvent) at /// the beginning of a session. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Auth { /// The method of authentication. @@ -32,6 +36,8 @@ pub struct AuthReply { /// /// The server will send back a [`PingReply`] with the same timestamp as soon as /// possible. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Ping { /// An arbitrary value, intended to be a unix timestamp. diff --git a/src/api/types.rs b/euphoxide/src/api/types.rs similarity index 91% rename from src/api/types.rs rename to euphoxide/src/api/types.rs index b1408a8..d7121f8 100644 --- a/src/api/types.rs +++ b/euphoxide/src/api/types.rs @@ -1,20 +1,16 @@ -//! Field types. +//! Models the [field types][0]. +//! +//! [0]: https://euphoria.leet.nu/heim/api#field-types -// TODO Add newtype wrappers for different kinds of IDs? - -// Serde's derive macros generate this warning and I can't turn it off locally, -// so I'm turning it off for the entire module. -#![allow(clippy::use_self)] - -use std::num::ParseIntError; -use std::str::FromStr; -use std::{error, fmt}; +use std::{error, fmt, num::ParseIntError, str::FromStr}; use jiff::Timestamp; use serde::{de, ser, Deserialize, Serialize}; use serde_json::Value; /// Describes an account and its preferred name. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountView { /// The id of the account. @@ -24,7 +20,9 @@ pub struct AccountView { } /// Mode of authentication. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +/// +/// +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum AuthOption { /// Authentication with a passcode, where a key is derived from the passcode @@ -36,6 +34,8 @@ pub enum AuthOption { /// /// It corresponds to a chat message, or a post, or any broadcasted event in a /// room that should appear in the log. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { /// The id of the message (unique within a room). @@ -72,6 +72,8 @@ pub struct Message { /// The type of a packet. /// /// Not all of these types have their corresponding data modeled as a struct. +/// +/// #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum PacketType { @@ -250,6 +252,8 @@ impl fmt::Display for PacketType { } /// Describes an account to its owner. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PersonalAccountView { /// The id of the account. @@ -261,6 +265,8 @@ pub struct PersonalAccountView { } /// Describes a session and its identity. +/// +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SessionView { /// The id of an agent or account (or bot). @@ -290,6 +296,8 @@ pub struct SessionView { /// A 13-character string, usually used as aunique identifier for some type of object. /// /// It is the base-36 encoding of an unsigned, 64-bit integer. +/// +/// #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct Snowflake(pub u64); @@ -307,7 +315,7 @@ impl Snowflake { /// representation of message ids to suddenly use the upper parts of the /// range, and since message ids mostly consist of a timestamp, this /// approach should last until at least 2075. - pub const MAX: Self = Snowflake(i64::MAX as u64); + pub const MAX: Self = Self(i64::MAX as u64); } impl fmt::Display for Snowflake { @@ -324,6 +332,7 @@ impl fmt::Display for Snowflake { } } +/// An error that occurred while parsing a [`Snowflake`]. #[derive(Debug)] pub enum ParseSnowflakeError { InvalidLength(usize), @@ -365,7 +374,7 @@ impl FromStr for Snowflake { return Err(ParseSnowflakeError::InvalidLength(s.len())); } let n = u64::from_str_radix(s, 36)?; - Ok(Snowflake(n)) + Ok(Self(n)) } } @@ -402,6 +411,8 @@ impl<'de> Deserialize<'de> for Snowflake { /// Time is specified as a signed 64-bit integer, giving the number of seconds /// since the Unix Epoch. +/// +/// #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct Time(pub i64); @@ -426,6 +437,8 @@ impl Time { /// /// It is possible for this value to have no prefix and colon, and there is no /// fixed format for the unique value. +/// +/// #[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct UserId(pub String); @@ -435,21 +448,27 @@ impl fmt::Display for UserId { } } +/// What kind of user a [`UserId`] is. #[derive(Debug, PartialEq, Eq)] -pub enum SessionType { +pub enum UserType { Agent, Account, Bot, } impl UserId { - pub fn session_type(&self) -> Option { + /// Retrieve the [`UserType`] of this user. + /// + /// This method can return [`None`] because user IDs used to have no + /// associated type. Such user IDs can still occur in old room logs, so + /// euphoxide supports them. + pub fn user_type(&self) -> Option { if self.0.starts_with("agent:") { - Some(SessionType::Agent) + Some(UserType::Agent) } else if self.0.starts_with("account:") { - Some(SessionType::Account) + Some(UserType::Account) } else if self.0.starts_with("bot:") { - Some(SessionType::Bot) + Some(UserType::Bot) } else { None } diff --git a/euphoxide/src/client.rs b/euphoxide/src/client.rs new file mode 100644 index 0000000..7ed3db4 --- /dev/null +++ b/euphoxide/src/client.rs @@ -0,0 +1,4 @@ +//! A connection from a client's perspective. + +pub mod conn; +pub mod state; diff --git a/euphoxide/src/client/conn.rs b/euphoxide/src/client/conn.rs new file mode 100644 index 0000000..2ee0905 --- /dev/null +++ b/euphoxide/src/client/conn.rs @@ -0,0 +1,343 @@ +//! Client-specific connection with a more expressive API. + +use std::{future::Future, time::Duration}; + +use log::debug; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + client::IntoClientRequest, + http::{header, HeaderValue}, +}; + +use crate::{ + api::{Command, Data, LoginReply, ParsedPacket}, + conn::{Conn, ConnConfig, Side}, + replies::{self, PendingReply, Replies}, + Error, Result, +}; + +use super::state::State; + +enum ConnCommand { + SendCmd(Data, oneshot::Sender>>), + GetState(oneshot::Sender), +} + +/// Configuration options for a [`ClientConn`]. +#[derive(Debug, Clone)] +pub struct ClientConnConfig { + /// The domain where the server is hosted. + pub domain: String, + /// Whether the client should present itself as a human to the server. + /// + /// This should only be set if the client is directly acting on behalf of a + /// human, similar to the web client. + pub human: bool, + /// The size of the [`mpsc::channel`] for communication between + /// [`ClientConn`] and [`ClientConnHandle`]. + pub channel_bufsize: usize, + /// Timeout for opening a websocket connection. + pub connect_timeout: Duration, + /// Timeout for server replies when sending euphoria commands, i.e. packets + /// implementing [`Command`]. + pub command_timeout: Duration, + /// How long to wait in-between sending pings. + /// + /// See also [`ConnConfig::ping_interval`]. + pub ping_interval: Duration, +} + +impl Default for ClientConnConfig { + fn default() -> Self { + Self { + domain: "euphoria.leet.nu".to_string(), + human: false, + channel_bufsize: 10, + connect_timeout: Duration::from_secs(10), + command_timeout: Duration::from_secs(30), + ping_interval: Duration::from_secs(30), + } + } +} + +/// A client connection to an euphoria server. +/// +/// This struct is a wrapper around [`Conn`] with a more client-centric API. It +/// tracks the connection state, including room information sent by the server. +/// It also provides [`ClientConnHandle`], which can be used to asynchronously +/// send commands and await their replies. +pub struct ClientConn { + rx: mpsc::Receiver, + tx: mpsc::Sender, + + conn: Conn, + state: State, + + last_id: usize, + replies: Replies, +} + +impl ClientConn { + /// Retrieve the current [`State`] of the connection. + pub fn state(&self) -> &State { + &self.state + } + + /// Create a new handle for this connection. + pub fn handle(&self) -> ClientConnHandle { + ClientConnHandle { + tx: self.tx.clone(), + } + } + + /// Start closing the connection. + /// + /// To finish closing the connection gracefully, continue calling + /// [`Self::recv`] until it returns [`None`]. + pub async fn close(&mut self) -> Result<()> { + self.conn.close().await + } + + /// Receive a [`ParsedPacket`] over the connection. + /// + /// This method also maintains the connection by listening and responding to + /// pings as well as managing [`ClientConnHandle`]s. Thus, it must be called + /// regularly. + /// + /// Returns [`None`] if the connection is closed. + pub async fn recv(&mut self) -> Result> { + loop { + self.replies.purge(); + + // There's always at least one tx end (self.tx), so self.rx.recv() + // should never return None. + let packet = select! { + packet = self.conn.recv() => packet?, + Some(cmd) = self.rx.recv() => { + self.on_cmd(cmd).await; + continue; + }, + }; + + if let Some(packet) = &packet { + self.on_packet(packet).await?; + } + + break Ok(packet); + } + } + + /// Send a packet over the connection. + /// + /// A packet id is automatically generated and returned. When the server + /// replies to the packet, it will use this id as its [`ParsedPacket::id`]. + pub async fn send(&mut self, data: impl Into) -> Result { + // Overkill of universe-heat-death-like proportions + self.last_id = self.last_id.wrapping_add(1); + let id = self.last_id.to_string(); + + self.conn + .send(ParsedPacket::from_data(Some(id.clone()), data.into())) + .await?; + + Ok(id) + } + + async fn on_packet(&mut self, packet: &ParsedPacket) -> Result<()> { + if let Ok(data) = &packet.content { + self.state.on_data(data); + + // The euphoria server doesn't always disconnect the client when it + // would make sense to do so or when the API specifies it should. + // This ensures we always disconnect when it makes sense to do so. + if matches!( + data, + Data::DisconnectEvent(_) + | Data::LoginEvent(_) + | Data::LogoutEvent(_) + | Data::LoginReply(LoginReply { success: true, .. }) + | Data::LogoutReply(_) + ) { + self.close().await?; + } + } + + if let Some(id) = &packet.id { + let id = id.clone(); + self.replies.complete(&id, packet.clone()); + } + + Ok(()) + } + + async fn on_cmd(&mut self, cmd: ConnCommand) { + match cmd { + ConnCommand::SendCmd(data, sender) => { + let result = self.send(data).await.map(|id| self.replies.wait_for(id)); + let _ = sender.send(result); + } + ConnCommand::GetState(sender) => { + let _ = sender.send(self.state.clone()); + } + } + } + + /// Connect to a room. + /// + /// See [`Self::connect_with_config`] for more details. + pub async fn connect( + room: &str, + cookies: Option, + ) -> Result<(Self, Vec)> { + Self::connect_with_config(room, cookies, &ClientConnConfig::default()).await + } + + /// Connect to a room with a specific configuration. + /// + /// Cookies to be sent to the server can be specified as a [`HeaderValue`] + /// in the format of a [`Cookie` request header][0]. If the connection + /// attempt was successful, cookies set by the server will be returned + /// alongside the connection itself as one [`HeaderValue`] per [`Set-Cookie` + /// response header][1]. + /// + /// The tasks of cookie parsing and storage are not handled by this library. + /// + /// [0]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cookie + /// [1]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Set-Cookie + pub async fn connect_with_config( + room: &str, + cookies: Option, + config: &ClientConnConfig, + ) -> Result<(Self, Vec)> { + // Prepare URL + let human = if config.human { "?h=1" } else { "" }; + let uri = format!("wss://{}/room/{room}/ws{human}", config.domain); + debug!("Connecting to {uri} with cookies: {cookies:?}"); + + // Prepare request + let mut request = uri.into_client_request().expect("valid request"); + if let Some(cookies) = cookies { + request.headers_mut().append(header::COOKIE, cookies); + } + + // Connect to server + let (ws, response) = tokio::time::timeout( + config.connect_timeout, + tokio_tungstenite::connect_async(request), + ) + .await + .map_err(|_| Error::ConnectionTimeout)??; + + // Extract response cookies + let (mut parts, _) = response.into_parts(); + let cookies_set = match parts.headers.entry(header::SET_COOKIE) { + header::Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(), + header::Entry::Vacant(_) => vec![], + }; + debug!("Received cookies {cookies_set:?}"); + + // Prepare EuphConn + let conn_config = ConnConfig { + ping_interval: config.ping_interval, + }; + let conn = Conn::wrap_with_config(ws, Side::Client, conn_config); + + // Prepare client + let (tx, rx) = mpsc::channel(config.channel_bufsize); + let client = Self { + rx, + tx, + conn, + state: State::new(), + last_id: 0, + replies: Replies::new(config.command_timeout), + }; + + Ok((client, cookies_set)) + } +} + +/// Asynchronous access to a [`ClientConn`]. +/// +/// Handle methods are only processed while [`ClientConn::recv`] is being +/// called. They may return before they were processed by the associated +/// [`ClientConn`], or they may block until processed. Methods are processed in +/// the order they are called. +/// +/// The handle is cheap to clone. +#[derive(Debug, Clone)] +pub struct ClientConnHandle { + tx: mpsc::Sender, +} + +impl ClientConnHandle { + /// Send a command to the server. + /// + /// When awaited, returns either an error if something went wrong while + /// sending the command, or a second future with the server's reply (the + /// *reply future*). + /// + /// When awaited, the *reply future* returns either an error if something + /// was wrong with the reply, or the data returned by the server. The *reply + /// future* can be safely ignored and doesn't have to be awaited. + pub async fn send(&self, cmd: C) -> Result>> + where + C: Command + Into, + C::Reply: TryFrom, + { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::SendCmd(cmd.into(), tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + Ok(async { + let data = rx + .await + .map_err(|_| Error::ConnectionClosed)?? + .get() + .await + .map_err(|err| match err { + replies::Error::TimedOut => Error::CommandTimeout, + replies::Error::Canceled => Error::ConnectionClosed, + })? + .content + .map_err(Error::Euph)?; + + let ptype = data.packet_type(); + data.try_into() + .map_err(|_| Error::ReceivedUnexpectedPacket(ptype)) + }) + } + + /// Send a command to the server without waiting for a reply. + /// + /// This method is equivalent to calling and awaiting [`Self::send`] but + /// ignoring the *reply future*. The reason it exists is that clippy gets + /// really annoying when you try to ignore a future (which is usually the + /// right call). + pub async fn send_only(&self, cmd: C) -> Result<()> + where + C: Command + Into, + C::Reply: TryFrom, + { + let _ignore = self.send(cmd).await?; + Ok(()) + } + + /// Retrieve the current connection [`State`]. + pub async fn state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::GetState(tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + rx.await.map_err(|_| Error::ConnectionClosed) + } +} diff --git a/euphoxide/src/client/state.rs b/euphoxide/src/client/state.rs new file mode 100644 index 0000000..a90edde --- /dev/null +++ b/euphoxide/src/client/state.rs @@ -0,0 +1,307 @@ +//! Models the client's connection state. + +use std::collections::HashMap; + +use jiff::Timestamp; +use log::debug; + +use crate::api::{ + BounceEvent, Data, HelloEvent, NickEvent, PersonalAccountView, SessionId, SessionView, + SnapshotEvent, UserId, +}; + +/// Information about a session in the room. +/// +/// For quite a while before finally going down altogether, the euphoria.io +/// instance had an unreliable nick list: Listings returned by the server were +/// usually incomplete. Because of this, the bot library uses any observable +/// action by a session (including nick changes) to update the listing. Since +/// nick events don't include full session info though, the [`SessionInfo`] enum +/// can contain partial information. +/// +/// This level of paranioa probably isn't required any more now that the only +/// euphoria instance is working correctly. However, the code already works and +/// users who don't want to worry about it can just ignore partial session +/// infos. +#[derive(Debug, Clone)] +pub enum SessionInfo { + Full(SessionView), + Partial(NickEvent), +} + +impl SessionInfo { + /// Retrieve the user id of the session. + pub fn id(&self) -> &UserId { + match self { + Self::Full(sess) => &sess.id, + Self::Partial(nick) => &nick.id, + } + } + + /// Retrieve the session id of the session. + pub fn session_id(&self) -> &SessionId { + match self { + Self::Full(sess) => &sess.session_id, + Self::Partial(nick) => &nick.session_id, + } + } + + /// Retrieve the user name of the session. + pub fn name(&self) -> &str { + match self { + Self::Full(sess) => &sess.name, + Self::Partial(nick) => &nick.to, + } + } +} + +impl From for SessionInfo { + fn from(value: SessionView) -> Self { + Self::Full(value) + } +} + +impl From for SessionInfo { + fn from(value: NickEvent) -> Self { + Self::Partial(value) + } +} + +/// The state of the connection before the client has joined the room. +/// +/// Depending on the room, the client may need to authenticate or log in in +/// order to join. +#[derive(Debug, Clone)] +pub struct Joining { + /// Since when the connection has been in this state. + pub since: Timestamp, + /// A [`HelloEvent`], if one has been received. + /// + /// Contains information about the client's own session. + pub hello: Option, + /// A [`SnapshotEvent`], if one has been received. + pub snapshot: Option, + /// A [`BounceEvent`], if one has been received. + pub bounce: Option, +} + +impl Joining { + fn new() -> Self { + Self { + since: Timestamp::now(), + hello: None, + snapshot: None, + bounce: None, + } + } + + fn on_data(&mut self, data: &Data) { + match data { + Data::BounceEvent(p) => self.bounce = Some(p.clone()), + Data::HelloEvent(p) => self.hello = Some(p.clone()), + Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), + _ => {} + } + } + + fn to_joined(&self) -> Option { + let hello = self.hello.as_ref()?; + let snapshot = self.snapshot.as_ref()?; + + let mut session = hello.session.clone(); + + if let Some(nick) = &snapshot.nick { + session.name = nick.clone(); + } + + let listing = snapshot + .listing + .iter() + .cloned() + .map(|s| (s.session_id.clone(), SessionInfo::Full(s))) + .collect::>(); + + Some(Joined { + since: Timestamp::now(), + session, + account: hello.account.clone(), + listing, + }) + } +} + +/// The state of the connection after the client has successfully joined the +/// room. +/// +/// The client may need to set a nick in order to be able to send messages. +/// However, it can access the room history without nick. +#[derive(Debug, Clone)] +pub struct Joined { + /// Since when the connection has been in this state. + pub since: Timestamp, + /// The client's own session. + pub session: SessionView, + /// Account information, if the client is logged in. + pub account: Option, + /// All sessions currently connected to the room (except the client's own + /// session). + pub listing: HashMap, +} + +impl Joined { + fn on_data(&mut self, data: &Data) { + match data { + Data::JoinEvent(p) => { + debug!("Updating listing after join-event"); + self.listing + .insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone())); + } + Data::PartEvent(p) => { + debug!("Updating listing after part-event"); + self.listing.remove(&p.0.session_id); + } + Data::NetworkEvent(p) => { + if p.r#type == "partition" { + debug!("Updating listing after network-event with type partition"); + self.listing.retain(|_, s| match s { + SessionInfo::Full(s) => { + s.server_id != p.server_id && s.server_era != p.server_era + } + // We can't know if the session was disconnected by the + // partition or not, so we're erring on the side of + // caution and assuming they were kicked. If we're + // wrong, we'll re-add the session as soon as it + // performs another visible action. + // + // If we always kept such sessions, we might keep + // disconnected ones indefinitely, thereby keeping them + // from moving on, instead forever tethering them to the + // digital realm. + SessionInfo::Partial(_) => false, + }); + } + } + Data::SendEvent(p) => { + debug!("Updating listing after send-event"); + self.listing.insert( + p.0.sender.session_id.clone(), + SessionInfo::Full(p.0.sender.clone()), + ); + } + Data::NickEvent(p) => { + debug!("Updating listing after nick-event"); + self.listing + .entry(p.session_id.clone()) + .and_modify(|s| match s { + SessionInfo::Full(session) => session.name = p.to.clone(), + SessionInfo::Partial(_) => *s = SessionInfo::Partial(p.clone()), + }) + .or_insert_with(|| SessionInfo::Partial(p.clone())); + } + Data::NickReply(p) => { + debug!("Updating own session after nick-reply"); + assert_eq!(self.session.id, p.id); + self.session.name = p.to.clone(); + } + Data::WhoReply(p) => { + debug!("Updating listing after who-reply"); + self.listing.clear(); + for session in p.listing.clone() { + if session.session_id == self.session.session_id { + self.session = session; + } else { + self.listing + .insert(session.session_id.clone(), session.into()); + } + } + } + _ => {} + } + } +} + +/// The state of a connection to the server, from a client's perspective. +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum State { + /// The client has not joined the room yet. + Joining(Joining), + /// The client has successfully joined the room. + Joined(Joined), +} + +impl State { + /// Create a new state for a fresh connection. + /// + /// Assumes that no packets have been received yet. See also + /// [`Self::on_data`]. + pub fn new() -> Self { + Joining::new().into() + } + + /// If the state consists of a [`Joining`], return a reference to it. + pub fn as_joining(&self) -> Option<&Joining> { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + /// If the state consists of a [`Joined`], return a reference to it. + pub fn as_joined(&self) -> Option<&Joined> { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } + + /// If the state consists of a [`Joining`], return it. + pub fn into_joining(self) -> Option { + match self { + Self::Joining(joining) => Some(joining), + Self::Joined(_) => None, + } + } + + /// If the state consists of a [`Joined`], return it. + pub fn into_joined(self) -> Option { + match self { + Self::Joining(_) => None, + Self::Joined(joined) => Some(joined), + } + } + + /// Update the state with a packet received from the server. + /// + /// This method should be called whenever any packet is received from the + /// server. Skipping packets may cause the state to become inconsistent. + pub fn on_data(&mut self, data: &Data) { + match self { + Self::Joining(joining) => { + joining.on_data(data); + if let Some(joined) = joining.to_joined() { + *self = joined.into(); + } + } + Self::Joined(joined) => joined.on_data(data), + } + } +} + +impl Default for State { + fn default() -> Self { + Self::new() + } +} + +impl From for State { + fn from(value: Joining) -> Self { + Self::Joining(value) + } +} + +impl From for State { + fn from(value: Joined) -> Self { + Self::Joined(value) + } +} diff --git a/euphoxide/src/conn.rs b/euphoxide/src/conn.rs new file mode 100644 index 0000000..513711a --- /dev/null +++ b/euphoxide/src/conn.rs @@ -0,0 +1,312 @@ +//! Basic connection between client and server. + +use std::{fmt, time::Duration}; + +use futures_util::{SinkExt, StreamExt}; +use jiff::Timestamp; +use log::debug; +use tokio::{ + net::TcpStream, + select, + time::{self, Instant}, +}; +use tokio_tungstenite::{ + tungstenite::{client::IntoClientRequest, handshake::client::Response, Message}, + MaybeTlsStream, WebSocketStream, +}; + +use crate::{ + api::{Data, Packet, PacketType, ParsedPacket, Ping, PingEvent, PingReply, Time}, + Error, Result, +}; + +/// Which side of the connection we're on. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Side { + /// We're the client and are talking to a server. + Client, + /// We're the server and are talking to a client. + Server, +} + +/// Configuration options for a [`Conn`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConnConfig { + /// How long to wait in-between sending pings. + /// + /// This includes both websocket and euphoria pings ([`Ping`] or + /// [`PingEvent`]). + pub ping_interval: Duration, +} + +impl Default for ConnConfig { + fn default() -> Self { + Self { + ping_interval: Duration::from_secs(30), + } + } +} + +/// A basic connection between a client and a server. +/// +/// The connection can be used both from a server's and from a client's +/// perspective. In both cases, it performs regular websocket *and* euphoria +/// pings and terminates the connection if the other side does not reply before +/// the next ping is sent. +pub struct Conn { + ws: WebSocketStream>, + side: Side, + config: ConnConfig, + + // The websocket server may send a pong frame with arbitrary payload + // unprompted at any time (see RFC 6455 5.5.3). Because of this, we can't + // just remember the last pong payload. + last_ping: Instant, + last_ws_ping_payload: Option>, + last_ws_ping_replied_to: bool, + last_euph_ping_payload: Option