diff --git a/euphoxide-bot/examples/examplebot.rs b/euphoxide-bot/examples/examplebot.rs new file mode 100644 index 0000000..bdc4864 --- /dev/null +++ b/euphoxide-bot/examples/examplebot.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use euphoxide::{ + api::{Data, Message, Nick, Send}, + client::conn::ClientConnHandle, +}; +use euphoxide_bot::{Bot, BotEvent, ServerConfig}; + +async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> { + conn.send_only(Nick { + name: "examplebot".to_string(), + }) + .await?; + + Ok(()) +} + +async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + conn.send_only(Send { + content: "Pong!".to_string(), + parent: Some(msg.id), + }) + .await?; + + Ok(()) +} + +async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> { + let mut parent = msg.id; + + for _ in 0..3 { + let first = conn + .send(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + parent = first.await?.0.id; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + conn.send_only(Send { + content: "brick".to_string(), + parent: Some(parent), + }) + .await?; + + Ok(()) +} + +async fn on_data(conn: ClientConnHandle, data: Data) { + let result = match data { + Data::SnapshotEvent(_) => set_nick(&conn).await, + Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await, + Data::SendEvent(event) if event.0.content == "!pyramid" => { + send_pyramid(&conn, event.0).await + } + _ => Ok(()), + }; + + if let Err(err) = result { + println!("Error while responding: {err}"); + } +} + +async fn run() -> anyhow::Result<()> { + let mut bot = Bot::new(); + + let config = ServerConfig::default() + .instance("test") + .with_username("examplebot"); + + bot.add_instance((), config); + + while let Some(event) = bot.recv().await { + if let BotEvent::Packet { conn, packet, .. } = event { + let data = packet.into_data()?; + tokio::task::spawn(on_data(conn, data)); + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + loop { + if let Err(err) = run().await { + println!("Error while running: {err}"); + } + } +} diff --git a/euphoxide-bot/src/bot.rs b/euphoxide-bot/src/bot.rs new file mode 100644 index 0000000..67bc8d6 --- /dev/null +++ b/euphoxide-bot/src/bot.rs @@ -0,0 +1,175 @@ +use std::{ + collections::HashMap, + fmt, hash, + sync::{Arc, RwLock}, +}; + +use euphoxide::{ + api::ParsedPacket, + client::{conn::ClientConnHandle, state::State}, +}; +use tokio::sync::mpsc; + +use crate::{BotConfig, Instance, InstanceConfig, InstanceEvent}; + +#[derive(Debug)] +pub enum BotEvent { + Started { + instance: Instance, + }, + Connecting { + instance: Instance, + }, + Connected { + instance: Instance, + conn: ClientConnHandle, + state: State, + }, + Joined { + instance: Instance, + conn: ClientConnHandle, + state: State, + }, + Packet { + instance: Instance, + conn: ClientConnHandle, + state: State, + packet: ParsedPacket, + }, + Disconnected { + instance: Instance, + }, + Stopped { + instance: Instance, + }, +} + +impl BotEvent { + fn from_instance_event(instance: Instance, event: InstanceEvent) -> Self { + match event { + InstanceEvent::Started { id: _ } => Self::Started { instance }, + InstanceEvent::Connecting { id: _ } => Self::Connecting { instance }, + InstanceEvent::Connected { id: _, conn, state } => Self::Connected { + instance, + conn, + state, + }, + InstanceEvent::Joined { id: _, conn, state } => Self::Joined { + instance, + conn, + state, + }, + InstanceEvent::Packet { + id: _, + conn, + state, + packet, + } => Self::Packet { + instance, + conn, + state, + packet, + }, + InstanceEvent::Disconnected { id: _ } => Self::Disconnected { instance }, + InstanceEvent::Stopped { id: _ } => Self::Stopped { instance }, + } + } + + pub fn instance(&self) -> &Instance { + match self { + Self::Started { instance } => instance, + Self::Connecting { instance, .. } => instance, + Self::Connected { instance, .. } => instance, + Self::Joined { instance, .. } => instance, + Self::Packet { instance, .. } => instance, + Self::Disconnected { instance } => instance, + Self::Stopped { instance } => instance, + } + } +} + +pub struct Bot { + config: BotConfig, + instances: Arc>>>, + event_tx: mpsc::Sender>, + event_rx: mpsc::Receiver>, +} + +impl Bot { + pub fn new() -> Self { + Self::new_with_config(BotConfig::default()) + } + + pub fn new_with_config(config: BotConfig) -> Self { + let (event_tx, event_rx) = mpsc::channel(10); + Self { + config, + instances: Arc::new(RwLock::new(HashMap::new())), + event_tx, + event_rx, + } + } + + fn purge_instances(&self) { + let mut guard = self.instances.write().unwrap(); + guard.retain(|_, v| !v.stopped()); + } + + pub fn get_instances(&self) -> Vec> + where + I: Clone, + { + self.instances.read().unwrap().values().cloned().collect() + } + + pub fn add_instance(&self, id: I, config: InstanceConfig) + where + I: Clone + fmt::Debug + Send + 'static + Eq + hash::Hash, + { + let mut guard = self.instances.write().unwrap(); + + if guard.contains_key(&id) { + return; + } + + guard.insert(id.clone(), Instance::new(id, config, self.event_tx.clone())); + } + + pub async fn recv(&mut self) -> Option> + where + I: Clone + Eq + hash::Hash, + { + // We hold exactly one sender. If no other senders exist, then all + // instances are dead and we'll never receive any more events unless we + // return and allow the user to add more instances again. + while self.event_rx.sender_strong_count() > 1 { + // Prevent potential memory leak + self.purge_instances(); + + let Ok(event) = + tokio::time::timeout(self.config.event_timeout, self.event_rx.recv()).await + else { + // We need to re-check the sender count occasionally. It's + // possible that there are still instances that just haven't + // sent an event in a while, so we can't just return here. + continue; + }; + + // This only returns None if no senders remain, and since we always + // own one sender, this can't happen. + let event = event.expect("event channel should never close since we own a sender"); + + if let Some(instance) = self.instances.read().unwrap().get(event.id()) { + return Some(BotEvent::from_instance_event(instance.clone(), event)); + } + } + + None + } +} + +impl Default for Bot { + fn default() -> Self { + Self::new() + } +} diff --git a/euphoxide-bot/src/config.rs b/euphoxide-bot/src/config.rs index 40b508d..18880b0 100644 --- a/euphoxide-bot/src/config.rs +++ b/euphoxide-bot/src/config.rs @@ -13,7 +13,19 @@ pub struct ServerConfig { pub join_attempts: usize, pub reconnect_delay: Duration, pub cmd_channel_bufsize: usize, - pub event_channel_bufsize: usize, +} + +impl ServerConfig { + pub fn instance(self, room: impl ToString) -> InstanceConfig { + InstanceConfig { + server: self, + room: room.to_string(), + human: false, + username: None, + force_username: false, + password: None, + } + } } impl Default for ServerConfig { @@ -24,7 +36,6 @@ impl Default for ServerConfig { join_attempts: 5, reconnect_delay: Duration::from_secs(30), cmd_channel_bufsize: 1, - event_channel_bufsize: 10, } } } @@ -40,17 +51,6 @@ pub struct InstanceConfig { } impl InstanceConfig { - pub fn new(room: impl ToString) -> Self { - Self { - server: ServerConfig::default(), - room: room.to_string(), - human: false, - username: None, - force_username: false, - password: None, - } - } - pub fn with_username(mut self, username: impl ToString) -> Self { self.username = Some(username.to_string()); self @@ -66,3 +66,17 @@ impl InstanceConfig { self } } + +pub struct BotConfig { + pub event_timeout: Duration, + pub event_channel_bufsize: usize, +} + +impl Default for BotConfig { + fn default() -> Self { + Self { + event_timeout: Duration::from_secs(1), + event_channel_bufsize: 10, + } + } +} diff --git a/euphoxide-bot/src/instance.rs b/euphoxide-bot/src/instance.rs index 0c7bd93..962df18 100644 --- a/euphoxide-bot/src/instance.rs +++ b/euphoxide-bot/src/instance.rs @@ -103,6 +103,20 @@ pub enum InstanceEvent { }, } +impl InstanceEvent { + pub fn id(&self) -> &I { + match self { + Self::Started { id } => id, + Self::Connecting { id } => id, + Self::Connected { id, .. } => id, + Self::Joined { id, .. } => id, + Self::Packet { id, .. } => id, + Self::Disconnected { id } => id, + Self::Stopped { id } => id, + } + } +} + struct InstanceTask { id: I, config: InstanceConfig, @@ -342,9 +356,8 @@ impl fmt::Debug for Instance { } impl Instance { - pub fn new(id: I, config: InstanceConfig) -> (Self, mpsc::Receiver>) { + pub fn new(id: I, config: InstanceConfig, event_tx: mpsc::Sender>) -> Self { let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); - let (event_tx, event_rx) = mpsc::channel(config.server.event_channel_bufsize); let task = InstanceTask { id: id.clone(), @@ -357,7 +370,7 @@ impl Instance { tokio::task::spawn(task.run()); - (Self { id, cmd_tx }, event_rx) + Self { id, cmd_tx } } } diff --git a/euphoxide-bot/src/lib.rs b/euphoxide-bot/src/lib.rs index fc1ba4e..1a02484 100644 --- a/euphoxide-bot/src/lib.rs +++ b/euphoxide-bot/src/lib.rs @@ -1,4 +1,5 @@ +mod bot; mod config; mod instance; -pub use crate::{config::*, instance::*}; +pub use crate::{bot::*, config::*, instance::*};