From ba820f39ea591c26ffe4013e9d8b7f8cc91014af Mon Sep 17 00:00:00 2001 From: Joscha Date: Wed, 11 Dec 2024 01:01:11 +0100 Subject: [PATCH] Add client instance --- Cargo.toml | 3 +- euphoxide-bot/Cargo.toml | 14 ++ euphoxide-bot/src/instance.rs | 447 ++++++++++++++++++++++++++++++++++ euphoxide-bot/src/lib.rs | 1 + euphoxide/src/api/types.rs | 2 +- 5 files changed, 465 insertions(+), 2 deletions(-) create mode 100644 euphoxide-bot/Cargo.toml create mode 100644 euphoxide-bot/src/instance.rs create mode 100644 euphoxide-bot/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1be606a..97995e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["euphoxide"] +members = ["euphoxide", "euphoxide-bot"] [workspace.package] version = "0.5.1" @@ -8,6 +8,7 @@ edition = "2021" [workspace.dependencies] caseless = "0.2.1" +cookie = "0.18.1" futures-util = "0.3.31" jiff = { version = "0.1.15", default-features = false, features = ["std"] } log = "0.4.22" diff --git a/euphoxide-bot/Cargo.toml b/euphoxide-bot/Cargo.toml new file mode 100644 index 0000000..988a1d7 --- /dev/null +++ b/euphoxide-bot/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "euphoxide-bot" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] +cookie = { workspace = true } +euphoxide = { path = "../euphoxide" } +log = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +tokio-tungstenite = { workspace = true } + +[lints] +workspace = true diff --git a/euphoxide-bot/src/instance.rs b/euphoxide-bot/src/instance.rs new file mode 100644 index 0000000..d08b5bd --- /dev/null +++ b/euphoxide-bot/src/instance.rs @@ -0,0 +1,447 @@ +use std::{ + fmt, result, + str::FromStr, + sync::{Arc, Mutex}, + time::Duration, +}; + +use cookie::{Cookie, CookieJar}; +use euphoxide::{ + api::{Auth, AuthOption, BounceEvent, Data, Nick, ParsedPacket}, + client::{ + conn::{ClientConn, ClientConnConfig, ClientConnHandle}, + state::State, + }, +}; +use log::warn; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + self, + http::{HeaderValue, StatusCode}, +}; + +enum Error { + Stopped, + NoReferences, + AuthRequired, + InvalidPassword, + OutOfJoinAttempts, + Euphoxide(euphoxide::Error), +} + +impl Error { + fn is_fatal(&self) -> bool { + match self { + Self::Stopped => true, + Self::NoReferences => true, + Self::AuthRequired => true, + Self::InvalidPassword => true, + Self::OutOfJoinAttempts => true, + Self::Euphoxide(euphoxide::Error::Tungstenite(tungstenite::Error::Http(response))) => { + response.status() == StatusCode::NOT_FOUND + } + _ => false, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Stopped => write!(f, "the instance was stopped manually"), + Self::NoReferences => write!(f, "all references to the instance were dropped"), + Self::AuthRequired => write!(f, "authentication required but no credentials found"), + Self::InvalidPassword => write!(f, "authentication required but password is invalid"), + Self::OutOfJoinAttempts => write!(f, "failed to join within attempt limit"), + Self::Euphoxide(error) => write!(f, "{error}"), + } + } +} + +impl From for Error { + fn from(value: euphoxide::Error) -> Self { + Self::Euphoxide(value) + } +} + +type Result = result::Result; + +enum Command { + GetConn(oneshot::Sender), + Stop, +} + +#[derive(Debug)] +pub enum Event { + Started { + id: usize, + }, + Connecting { + id: usize, + }, + Connected { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Joined { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Packet { + id: usize, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + id: usize, + }, + Stopped { + id: usize, + }, +} + +impl Event { + pub fn id(&self) -> usize { + match self { + Self::Started { id } => *id, + Self::Connecting { id } => *id, + Self::Connected { id, .. } => *id, + Self::Joined { id, .. } => *id, + Self::Packet { id, .. } => *id, + Self::Disconnected { id } => *id, + Self::Stopped { id } => *id, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServerConfig { + pub client: ClientConnConfig, + pub cookies: Arc>, + pub join_attempts: usize, + pub reconnect_delay: Duration, + pub cmd_channel_bufsize: usize, +} + +impl ServerConfig { + pub fn instance(self, room: impl ToString) -> InstanceConfig { + InstanceConfig { + server: self, + room: room.to_string(), + human: false, + username: None, + force_username: false, + password: None, + } + } +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + client: ClientConnConfig::default(), + cookies: Arc::new(Mutex::new(CookieJar::new())), + join_attempts: 5, + reconnect_delay: Duration::from_secs(30), + cmd_channel_bufsize: 1, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct InstanceConfig { + pub server: ServerConfig, + pub room: String, + pub human: bool, + pub username: Option, + pub force_username: bool, + pub password: Option, +} + +impl InstanceConfig { + pub fn with_human(mut self, human: bool) -> Self { + self.human = human; + self + } + + pub fn with_username(mut self, username: impl ToString) -> Self { + self.username = Some(username.to_string()); + self + } + + pub fn with_force_username(mut self, enabled: bool) -> Self { + self.force_username = enabled; + self + } + + pub fn with_password(mut self, password: impl ToString) -> Self { + self.password = Some(password.to_string()); + self + } +} + +struct InstanceTask { + id: usize, + config: InstanceConfig, + + cmd_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + + attempts: usize, + never_joined: bool, +} + +impl InstanceTask { + fn get_cookies(&self) -> Option { + self.config + .server + .cookies + .lock() + .unwrap() + .iter() + .map(|c| c.stripped().to_string()) + .collect::>() + .join("; ") + .try_into() + .ok() + } + + fn set_cookies(&mut self, cookies: &[HeaderValue]) { + let mut guard = self.config.server.cookies.lock().unwrap(); + for cookie in cookies { + if let Ok(cookie) = cookie.to_str() { + if let Ok(cookie) = Cookie::from_str(cookie) { + guard.add(cookie); + } + } + } + } + + async fn connect(&mut self) -> Result { + let (conn, cookies) = ClientConn::connect_with_config( + &self.config.room, + self.get_cookies(), + &self.config.server.client, + ) + .await?; + + self.set_cookies(&cookies); + + Ok(conn) + } + + async fn on_joined(&mut self, conn: &ClientConn) { + self.never_joined = false; + + let _ = self + .event_tx + .send(Event::Joined { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + } + + async fn on_packet(&mut self, conn: &mut ClientConn, packet: ParsedPacket) -> Result<()> { + let _ = self + .event_tx + .send(Event::Packet { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + packet: packet.clone(), + }) + .await; + + match packet.into_data()? { + // Attempting to authenticate + Data::BounceEvent(BounceEvent { + auth_options: Some(auth_options), + .. + }) if auth_options.contains(&AuthOption::Passcode) => { + if let Some(password) = &self.config.password { + conn.send(Auth { + r#type: AuthOption::Passcode, + passcode: Some(password.clone()), + }) + .await?; + } else { + return Err(Error::AuthRequired); + } + } + + // Auth attempt failed :( + Data::AuthReply(ev) if !ev.success => return Err(Error::InvalidPassword), + + // Just joined + Data::SnapshotEvent(ev) => { + if let Some(username) = &self.config.username { + if ev.nick.is_none() || self.config.force_username { + conn.send(Nick { + name: username.clone(), + }) + .await?; + } + } + + // Maybe we should only count this as joining if we successfully + // updated the nick instead of just sending a Nick command? And + // maybe we should ensure that we're in the State::Joined state? + // Both of these would probably complicate the code a lot. On + // the other hand, InstanceEvent::Joined::state would contain + // the actual nick after joining, which feels like the right + // thing to do™. Probably not worth the increase in code + // complexity though. + + self.on_joined(conn).await; + } + + _ => {} + } + + Ok(()) + } + + async fn on_cmd(&mut self, conn: &ClientConn, cmd: Command) -> Result<()> { + match cmd { + Command::GetConn(sender) => { + let _ = sender.send(conn.handle()); + Ok(()) + } + Command::Stop => Err(Error::Stopped), + } + } + + async fn run_once(&mut self) -> Result<()> { + // If we try to connect too many times without managing to join at least + // once, the room is probably not accessible for one reason or another + // and the instance should stop. + self.attempts += 1; + if self.never_joined && self.attempts > self.config.server.join_attempts { + return Err(Error::OutOfJoinAttempts); + } + + let _ = self.event_tx.send(Event::Connecting { id: self.id }).await; + + let mut conn = match self.connect().await { + Ok(conn) => conn, + Err(err) => { + // When we fail to connect, we want to wait a bit before + // reconnecting in order not to spam the server. However, when + // we are connected successfully and then disconnect for + // whatever reason, we want to try to reconnect immediately. We + // might, for example, be disconnected from the server because + // we just logged in. + tokio::time::sleep(self.config.server.reconnect_delay).await; + Err(err)? + } + }; + + let _ = self + .event_tx + .send(Event::Connected { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + + let result = loop { + let received = select! { + r = conn.recv() => Ok(r?), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + // We received a packet + Ok(None) => break Ok(()), // Connection closed + Ok(Some(packet)) => self.on_packet(&mut conn, packet).await?, + // We received a command + Err(None) => break Err(Error::NoReferences), + Err(Some(cmd)) => self.on_cmd(&conn, cmd).await?, + }; + }; + + let _ = self + .event_tx + .send(Event::Disconnected { id: self.id }) + .await; + + result + } + + async fn run(mut self) { + let _ = self.event_tx.send(Event::Started { id: self.id }).await; + + loop { + if let Err(err) = self.run_once().await { + warn!("instance {:?}: {err}", self.id); + if err.is_fatal() { + break; + } + } + } + + let _ = self.event_tx.send(Event::Stopped { id: self.id }).await; + } +} + +#[derive(Clone)] +pub struct Instance { + id: usize, + cmd_tx: mpsc::Sender, +} + +impl fmt::Debug for Instance { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Instance") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl Instance { + pub fn new(id: usize, config: InstanceConfig, event_tx: mpsc::Sender) -> Self { + let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); + + let task = InstanceTask { + id, + config, + attempts: 0, + never_joined: false, + cmd_rx, + event_tx, + }; + + tokio::task::spawn(task.run()); + + Self { id, cmd_tx } + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn stopped(&self) -> bool { + self.cmd_tx.is_closed() + } + + pub async fn stop(&self) { + let _ = self.cmd_tx.send(Command::Stop).await; + } + + pub async fn handle(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetConn(tx)).await; + rx.await.ok() + } +} diff --git a/euphoxide-bot/src/lib.rs b/euphoxide-bot/src/lib.rs new file mode 100644 index 0000000..1d5ea99 --- /dev/null +++ b/euphoxide-bot/src/lib.rs @@ -0,0 +1 @@ +pub mod instance; diff --git a/euphoxide/src/api/types.rs b/euphoxide/src/api/types.rs index af5426b..d7121f8 100644 --- a/euphoxide/src/api/types.rs +++ b/euphoxide/src/api/types.rs @@ -22,7 +22,7 @@ pub struct AccountView { /// Mode of authentication. /// /// -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum AuthOption { /// Authentication with a passcode, where a key is derived from the passcode