diff --git a/Cargo.toml b/Cargo.toml index b79053d..e806c73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["euphoxide", "euphoxide-bot"] +members = ["euphoxide", "euphoxide-bot", "euphoxide-client"] [workspace.package] version = "0.5.1" @@ -22,6 +22,10 @@ unicode-normalization = "0.1.24" # For examples anyhow = "1.0.94" rustls = "0.23.19" +# In this workspace +euphoxide = { path = "./euphoxide" } +euphoxide-bot = { path = "./euphoxide-bot" } +euphoxide-client = { path = "./euphoxide-client" } [workspace.lints] rust.unsafe_code = { level = "forbid", priority = 1 } diff --git a/euphoxide-client/Cargo.toml b/euphoxide-client/Cargo.toml new file mode 100644 index 0000000..e148dc7 --- /dev/null +++ b/euphoxide-client/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "euphoxide-client" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] +cookie = { workspace = true } +euphoxide = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +tokio-tungstenite = { workspace = true } + +[lints] +workspace = true diff --git a/euphoxide-client/src/builder.rs b/euphoxide-client/src/builder.rs new file mode 100644 index 0000000..5146f83 --- /dev/null +++ b/euphoxide-client/src/builder.rs @@ -0,0 +1,40 @@ +use crate::ClientConfig; + +pub trait ClientBuilderBase<'a> { + type Base; +} + +pub struct ClientBuilder<'a, B: ClientBuilderBase<'a>> { + pub(crate) base: B::Base, + pub(crate) config: ClientConfig, +} + +impl<'a, B: ClientBuilderBase<'a>> ClientBuilder<'a, B> { + pub fn config(&self) -> &ClientConfig { + &self.config + } + + pub fn config_mut(&mut self) -> &mut ClientConfig { + &mut self.config + } + + pub fn with_human(mut self, human: bool) -> Self { + self.config.human = human; + self + } + + pub fn with_username(mut self, username: impl ToString) -> Self { + self.config.username = Some(username.to_string()); + self + } + + pub fn with_force_username(mut self, force_username: bool) -> Self { + self.config.force_username = force_username; + self + } + + pub fn with_password(mut self, password: impl ToString) -> Self { + self.config.password = Some(password.to_string()); + self + } +} diff --git a/euphoxide-client/src/config.rs b/euphoxide-client/src/config.rs new file mode 100644 index 0000000..ca1baea --- /dev/null +++ b/euphoxide-client/src/config.rs @@ -0,0 +1,71 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use cookie::CookieJar; +use euphoxide::client::conn::ClientConnConfig; + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ServerConfig { + pub client: ClientConnConfig, + pub cookies: Arc>, + pub join_attempts: usize, + pub reconnect_delay: Duration, + pub cmd_channel_bufsize: usize, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + client: ClientConnConfig::default(), + cookies: Arc::new(Mutex::new(CookieJar::new())), + join_attempts: 5, + reconnect_delay: Duration::from_secs(30), + cmd_channel_bufsize: 1, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ClientConfig { + pub server: ServerConfig, + pub room: String, + pub human: bool, + pub username: Option, + pub force_username: bool, + pub password: Option, +} + +impl ClientConfig { + pub fn new(server: ServerConfig, room: String) -> Self { + Self { + server, + room, + human: false, + username: None, + force_username: false, + password: None, + } + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct MultiClientConfig { + pub server: ServerConfig, + pub cmd_channel_bufsize: usize, + pub event_channel_bufsize: usize, +} + +impl Default for MultiClientConfig { + fn default() -> Self { + Self { + server: ServerConfig::default(), + cmd_channel_bufsize: 1, + event_channel_bufsize: 10, + } + } +} diff --git a/euphoxide-client/src/lib.rs b/euphoxide-client/src/lib.rs new file mode 100644 index 0000000..375d6cc --- /dev/null +++ b/euphoxide-client/src/lib.rs @@ -0,0 +1,6 @@ +mod builder; +mod config; +mod multi; +mod single; + +pub use self::{builder::*, config::*, multi::*, single::*}; diff --git a/euphoxide-client/src/multi.rs b/euphoxide-client/src/multi.rs new file mode 100644 index 0000000..95ea996 --- /dev/null +++ b/euphoxide-client/src/multi.rs @@ -0,0 +1,228 @@ +use std::{collections::HashMap, sync::Arc}; + +use euphoxide::{ + api::ParsedPacket, + client::{conn::ClientConnHandle, state::State}, +}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; + +use crate::{ + Client, ClientBuilder, ClientBuilderBase, ClientConfig, ClientEvent, MultiClientConfig, +}; + +#[derive(Debug)] +pub enum MultiClientEvent { + Started { + client: Client, + }, + Connecting { + client: Client, + }, + Connected { + client: Client, + conn: ClientConnHandle, + state: State, + }, + Joined { + client: Client, + conn: ClientConnHandle, + state: State, + }, + Packet { + client: Client, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + client: Client, + }, + Stopped { + client: Client, + }, +} + +impl MultiClientEvent { + fn from_client_event(client: Client, event: ClientEvent) -> Self { + match event { + ClientEvent::Started { id: _ } => Self::Started { client }, + ClientEvent::Connecting { id: _ } => Self::Connecting { client }, + ClientEvent::Connected { id: _, conn, state } => Self::Connected { + client, + conn, + state, + }, + ClientEvent::Joined { id: _, conn, state } => Self::Joined { + client, + conn, + state, + }, + ClientEvent::Packet { + id: _, + conn, + state, + packet, + } => Self::Packet { + client, + conn, + state, + packet, + }, + ClientEvent::Disconnected { id: _ } => Self::Disconnected { client }, + ClientEvent::Stopped { id: _ } => Self::Stopped { client }, + } + } + + pub fn client(&self) -> &Client { + match self { + Self::Started { client } => client, + Self::Connecting { client, .. } => client, + Self::Connected { client, .. } => client, + Self::Joined { client, .. } => client, + Self::Packet { client, .. } => client, + Self::Disconnected { client } => client, + Self::Stopped { client } => client, + } + } +} + +#[allow(clippy::large_enum_variant)] +enum Command { + GetClients(oneshot::Sender>), + AddClient(ClientConfig, oneshot::Sender), +} + +struct MultiClientTask { + next_id: usize, + clients: HashMap, + + cmd_rx: mpsc::Receiver, + event_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + out_tx: mpsc::Sender, +} + +impl MultiClientTask { + fn purge_clients(&mut self) { + self.clients.retain(|_, v| !v.stopped()); + } + + async fn on_event(&self, event: ClientEvent) { + if let Some(client) = self.clients.get(&event.id()) { + let event = MultiClientEvent::from_client_event(client.clone(), event); + let _ = self.out_tx.send(event).await; + } + } + + async fn on_cmd(&mut self, cmd: Command) { + match cmd { + Command::GetClients(tx) => { + self.purge_clients(); // Not necessary for correctness + let _ = tx.send(self.clients.values().cloned().collect()); + } + Command::AddClient(config, tx) => { + let id = self.next_id; + assert!(!self.clients.contains_key(&id)); + self.next_id += 1; + + let client = Client::new(id, config, self.event_tx.clone()); + self.clients.insert(id, client.clone()); + + let _ = tx.send(client); + } + } + } + + async fn run(mut self) { + loop { + // Prevent potential memory leak + self.purge_clients(); + + let received = select! { + r = self.event_rx.recv() => Ok(r), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + Ok(None) => break, + Ok(Some(event)) => self.on_event(event).await, + Err(None) => break, + Err(Some(cmd)) => self.on_cmd(cmd).await, + } + } + } +} + +#[derive(Clone)] +pub struct MultiClient { + config: Arc, + cmd_tx: mpsc::Sender, +} + +impl MultiClient { + pub fn new(event_tx: mpsc::Sender) -> Self { + Self::new_with_config(MultiClientConfig::default(), event_tx) + } + + pub fn new_with_config( + config: MultiClientConfig, + event_tx: mpsc::Sender, + ) -> Self { + let config = Arc::new(config); + let out_tx = event_tx; + + let (cmd_tx, cmd_rx) = mpsc::channel(config.cmd_channel_bufsize); + let (event_tx, event_rx) = mpsc::channel(config.event_channel_bufsize); + + let task = MultiClientTask { + next_id: 0, + clients: HashMap::new(), + cmd_rx, + event_rx, + event_tx, + out_tx, + }; + + tokio::task::spawn(task.run()); + + Self { config, cmd_tx } + } + + pub async fn get_clients(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetClients(tx)).await; + rx.await.expect("task should still be running") + } + + pub async fn add_client(&self, config: ClientConfig) -> Client { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::AddClient(config, tx)).await; + rx.await.expect("task should still be running") + } +} + +///////////// +// Builder // +///////////// + +impl<'a> ClientBuilderBase<'a> for MultiClient { + type Base = &'a Self; +} + +impl MultiClient { + pub fn client_builder(&self, room: impl ToString) -> ClientBuilder<'_, Self> { + ClientBuilder { + base: self, + config: ClientConfig::new(self.config.server.clone(), room.to_string()), + } + } +} + +impl ClientBuilder<'_, MultiClient> { + pub async fn build_and_add(self) -> Client { + self.base.add_client(self.config).await + } +} diff --git a/euphoxide-client/src/single.rs b/euphoxide-client/src/single.rs new file mode 100644 index 0000000..b6b54c6 --- /dev/null +++ b/euphoxide-client/src/single.rs @@ -0,0 +1,415 @@ +use std::{fmt, result, str::FromStr}; + +use cookie::Cookie; +use euphoxide::{ + api::{Auth, AuthOption, BounceEvent, Data, Nick, ParsedPacket}, + client::{ + conn::{ClientConn, ClientConnHandle}, + state::State, + }, +}; +use log::warn; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + self, + http::{HeaderValue, StatusCode}, +}; + +use crate::{ClientBuilder, ClientBuilderBase, ClientConfig, ServerConfig}; + +enum Error { + Stopped, + NoReferences, + AuthRequired, + InvalidPassword, + OutOfJoinAttempts, + Euphoxide(euphoxide::Error), +} + +impl Error { + fn is_fatal(&self) -> bool { + match self { + Self::Stopped => true, + Self::NoReferences => true, + Self::AuthRequired => true, + Self::InvalidPassword => true, + Self::OutOfJoinAttempts => true, + Self::Euphoxide(euphoxide::Error::Tungstenite(tungstenite::Error::Http(response))) => { + response.status() == StatusCode::NOT_FOUND + } + _ => false, + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Stopped => write!(f, "the instance was stopped manually"), + Self::NoReferences => write!(f, "all references to the instance were dropped"), + Self::AuthRequired => write!(f, "authentication required but no credentials found"), + Self::InvalidPassword => write!(f, "authentication required but password is invalid"), + Self::OutOfJoinAttempts => write!(f, "failed to join within attempt limit"), + Self::Euphoxide(error) => write!(f, "{error}"), + } + } +} + +impl From for Error { + fn from(value: euphoxide::Error) -> Self { + Self::Euphoxide(value) + } +} + +type Result = result::Result; + +enum Command { + GetConn(oneshot::Sender), + Stop, +} + +#[derive(Debug)] +pub enum ClientEvent { + Started { + id: usize, + }, + Connecting { + id: usize, + }, + Connected { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Joined { + id: usize, + conn: ClientConnHandle, + state: State, + }, + Packet { + id: usize, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + id: usize, + }, + Stopped { + id: usize, + }, +} + +impl ClientEvent { + pub fn id(&self) -> usize { + match self { + Self::Started { id } => *id, + Self::Connecting { id } => *id, + Self::Connected { id, .. } => *id, + Self::Joined { id, .. } => *id, + Self::Packet { id, .. } => *id, + Self::Disconnected { id } => *id, + Self::Stopped { id } => *id, + } + } +} + +struct ClientTask { + id: usize, + config: ClientConfig, + + cmd_rx: mpsc::Receiver, + event_tx: mpsc::Sender, + + attempts: usize, + never_joined: bool, +} + +impl ClientTask { + fn get_cookies(&self) -> Option { + self.config + .server + .cookies + .lock() + .unwrap() + .iter() + .map(|c| c.stripped().to_string()) + .collect::>() + .join("; ") + .try_into() + .ok() + } + + fn set_cookies(&mut self, cookies: &[HeaderValue]) { + let mut guard = self.config.server.cookies.lock().unwrap(); + for cookie in cookies { + if let Ok(cookie) = cookie.to_str() { + if let Ok(cookie) = Cookie::from_str(cookie) { + guard.add(cookie); + } + } + } + } + + async fn connect(&mut self) -> Result { + let (conn, cookies) = ClientConn::connect_with_config( + &self.config.room, + self.get_cookies(), + &self.config.server.client, + ) + .await?; + + self.set_cookies(&cookies); + + Ok(conn) + } + + async fn on_joined(&mut self, conn: &ClientConn) { + self.never_joined = false; + + let _ = self + .event_tx + .send(ClientEvent::Joined { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + } + + async fn on_packet(&mut self, conn: &mut ClientConn, packet: ParsedPacket) -> Result<()> { + let _ = self + .event_tx + .send(ClientEvent::Packet { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + packet: packet.clone(), + }) + .await; + + match packet.into_data()? { + // Attempting to authenticate + Data::BounceEvent(BounceEvent { + auth_options: Some(auth_options), + .. + }) if auth_options.contains(&AuthOption::Passcode) => { + if let Some(password) = &self.config.password { + conn.send(Auth { + r#type: AuthOption::Passcode, + passcode: Some(password.clone()), + }) + .await?; + } else { + return Err(Error::AuthRequired); + } + } + + // Auth attempt failed :( + Data::AuthReply(ev) if !ev.success => return Err(Error::InvalidPassword), + + // Just joined + Data::SnapshotEvent(ev) => { + if let Some(username) = &self.config.username { + if ev.nick.is_none() || self.config.force_username { + conn.send(Nick { + name: username.clone(), + }) + .await?; + } + } + + // Maybe we should only count this as joining if we successfully + // updated the nick instead of just sending a Nick command? And + // maybe we should ensure that we're in the State::Joined state? + // Both of these would probably complicate the code a lot. On + // the other hand, InstanceEvent::Joined::state would contain + // the actual nick after joining, which feels like the right + // thing to do™. Probably not worth the increase in code + // complexity though. + + self.on_joined(conn).await; + } + + _ => {} + } + + Ok(()) + } + + async fn on_cmd(&mut self, conn: &ClientConn, cmd: Command) -> Result<()> { + match cmd { + Command::GetConn(sender) => { + let _ = sender.send(conn.handle()); + Ok(()) + } + Command::Stop => Err(Error::Stopped), + } + } + + async fn run_once(&mut self) -> Result<()> { + // If we try to connect too many times without managing to join at least + // once, the room is probably not accessible for one reason or another + // and the instance should stop. + self.attempts += 1; + if self.never_joined && self.attempts > self.config.server.join_attempts { + return Err(Error::OutOfJoinAttempts); + } + + let _ = self + .event_tx + .send(ClientEvent::Connecting { id: self.id }) + .await; + + let mut conn = match self.connect().await { + Ok(conn) => conn, + Err(err) => { + // When we fail to connect, we want to wait a bit before + // reconnecting in order not to spam the server. However, when + // we are connected successfully and then disconnect for + // whatever reason, we want to try to reconnect immediately. We + // might, for example, be disconnected from the server because + // we just logged in. + tokio::time::sleep(self.config.server.reconnect_delay).await; + Err(err)? + } + }; + + let _ = self + .event_tx + .send(ClientEvent::Connected { + id: self.id, + conn: conn.handle(), + state: conn.state().clone(), + }) + .await; + + let result = loop { + let received = select! { + r = conn.recv() => Ok(r?), + r = self.cmd_rx.recv() => Err(r), + }; + + match received { + // We received a packet + Ok(None) => break Ok(()), // Connection closed + Ok(Some(packet)) => self.on_packet(&mut conn, packet).await?, + // We received a command + Err(None) => break Err(Error::NoReferences), + Err(Some(cmd)) => self.on_cmd(&conn, cmd).await?, + }; + }; + + let _ = self + .event_tx + .send(ClientEvent::Disconnected { id: self.id }) + .await; + + result + } + + async fn run(mut self) { + let _ = self + .event_tx + .send(ClientEvent::Started { id: self.id }) + .await; + + loop { + if let Err(err) = self.run_once().await { + warn!("instance {:?}: {err}", self.id); + if err.is_fatal() { + break; + } + } + } + + let _ = self + .event_tx + .send(ClientEvent::Stopped { id: self.id }) + .await; + } +} + +#[derive(Clone)] +pub struct Client { + id: usize, + cmd_tx: mpsc::Sender, +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Instance") + .field("id", &self.id) + .finish_non_exhaustive() + } +} + +impl Client { + pub fn new(id: usize, config: ClientConfig, event_tx: mpsc::Sender) -> Self { + let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); + + let task = ClientTask { + id, + config, + attempts: 0, + never_joined: false, + cmd_rx, + event_tx, + }; + + tokio::task::spawn(task.run()); + + Self { id, cmd_tx } + } + + pub fn id(&self) -> usize { + self.id + } + + pub fn stopped(&self) -> bool { + self.cmd_tx.is_closed() + } + + pub async fn stop(&self) { + let _ = self.cmd_tx.send(Command::Stop).await; + } + + pub async fn handle(&self) -> Option { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(Command::GetConn(tx)).await; + rx.await.ok() + } +} + +///////////// +// Builder // +///////////// + +impl ClientBuilderBase<'static> for Client { + type Base = (); +} + +impl Client { + pub fn builder(room: impl ToString) -> ClientBuilder<'static, Self> { + Self::builder_for_server(ServerConfig::default(), room) + } + + pub fn builder_for_server( + server: ServerConfig, + room: impl ToString, + ) -> ClientBuilder<'static, Self> { + ClientBuilder { + base: (), + config: ClientConfig::new(server, room.to_string()), + } + } +} + +impl ClientBuilder<'static, Client> { + pub fn build(self, id: usize, event_tx: mpsc::Sender) -> Client { + Client::new(id, self.config, event_tx) + } +}