Add Bot for managing multiple instances

This commit is contained in:
Joscha 2024-12-11 23:25:14 +01:00
parent 0fdb77fbd3
commit 36f6dc1587
5 changed files with 319 additions and 17 deletions

View file

@ -0,0 +1,99 @@
use std::time::Duration;
use euphoxide::{
api::{Data, Message, Nick, Send},
client::conn::ClientConnHandle,
};
use euphoxide_bot::{Bot, BotEvent, ServerConfig};
async fn set_nick(conn: &ClientConnHandle) -> anyhow::Result<()> {
conn.send_only(Nick {
name: "examplebot".to_string(),
})
.await?;
Ok(())
}
async fn send_pong(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> {
conn.send_only(Send {
content: "Pong!".to_string(),
parent: Some(msg.id),
})
.await?;
Ok(())
}
async fn send_pyramid(conn: &ClientConnHandle, msg: Message) -> anyhow::Result<()> {
let mut parent = msg.id;
for _ in 0..3 {
let first = conn
.send(Send {
content: "brick".to_string(),
parent: Some(parent),
})
.await?;
conn.send_only(Send {
content: "brick".to_string(),
parent: Some(parent),
})
.await?;
parent = first.await?.0.id;
tokio::time::sleep(Duration::from_secs(1)).await;
}
conn.send_only(Send {
content: "brick".to_string(),
parent: Some(parent),
})
.await?;
Ok(())
}
async fn on_data(conn: ClientConnHandle, data: Data) {
let result = match data {
Data::SnapshotEvent(_) => set_nick(&conn).await,
Data::SendEvent(event) if event.0.content == "!ping" => send_pong(&conn, event.0).await,
Data::SendEvent(event) if event.0.content == "!pyramid" => {
send_pyramid(&conn, event.0).await
}
_ => Ok(()),
};
if let Err(err) = result {
println!("Error while responding: {err}");
}
}
async fn run() -> anyhow::Result<()> {
let mut bot = Bot::new();
let config = ServerConfig::default()
.instance("test")
.with_username("examplebot");
bot.add_instance((), config);
while let Some(event) = bot.recv().await {
if let BotEvent::Packet { conn, packet, .. } = event {
let data = packet.into_data()?;
tokio::task::spawn(on_data(conn, data));
}
}
Ok(())
}
#[tokio::main]
async fn main() {
loop {
if let Err(err) = run().await {
println!("Error while running: {err}");
}
}
}

175
euphoxide-bot/src/bot.rs Normal file
View file

@ -0,0 +1,175 @@
use std::{
collections::HashMap,
fmt, hash,
sync::{Arc, RwLock},
};
use euphoxide::{
api::ParsedPacket,
client::{conn::ClientConnHandle, state::State},
};
use tokio::sync::mpsc;
use crate::{BotConfig, Instance, InstanceConfig, InstanceEvent};
#[derive(Debug)]
pub enum BotEvent<I> {
Started {
instance: Instance<I>,
},
Connecting {
instance: Instance<I>,
},
Connected {
instance: Instance<I>,
conn: ClientConnHandle,
state: State,
},
Joined {
instance: Instance<I>,
conn: ClientConnHandle,
state: State,
},
Packet {
instance: Instance<I>,
conn: ClientConnHandle,
state: State,
packet: ParsedPacket,
},
Disconnected {
instance: Instance<I>,
},
Stopped {
instance: Instance<I>,
},
}
impl<I> BotEvent<I> {
fn from_instance_event(instance: Instance<I>, event: InstanceEvent<I>) -> Self {
match event {
InstanceEvent::Started { id: _ } => Self::Started { instance },
InstanceEvent::Connecting { id: _ } => Self::Connecting { instance },
InstanceEvent::Connected { id: _, conn, state } => Self::Connected {
instance,
conn,
state,
},
InstanceEvent::Joined { id: _, conn, state } => Self::Joined {
instance,
conn,
state,
},
InstanceEvent::Packet {
id: _,
conn,
state,
packet,
} => Self::Packet {
instance,
conn,
state,
packet,
},
InstanceEvent::Disconnected { id: _ } => Self::Disconnected { instance },
InstanceEvent::Stopped { id: _ } => Self::Stopped { instance },
}
}
pub fn instance(&self) -> &Instance<I> {
match self {
Self::Started { instance } => instance,
Self::Connecting { instance, .. } => instance,
Self::Connected { instance, .. } => instance,
Self::Joined { instance, .. } => instance,
Self::Packet { instance, .. } => instance,
Self::Disconnected { instance } => instance,
Self::Stopped { instance } => instance,
}
}
}
pub struct Bot<I> {
config: BotConfig,
instances: Arc<RwLock<HashMap<I, Instance<I>>>>,
event_tx: mpsc::Sender<InstanceEvent<I>>,
event_rx: mpsc::Receiver<InstanceEvent<I>>,
}
impl<I> Bot<I> {
pub fn new() -> Self {
Self::new_with_config(BotConfig::default())
}
pub fn new_with_config(config: BotConfig) -> Self {
let (event_tx, event_rx) = mpsc::channel(10);
Self {
config,
instances: Arc::new(RwLock::new(HashMap::new())),
event_tx,
event_rx,
}
}
fn purge_instances(&self) {
let mut guard = self.instances.write().unwrap();
guard.retain(|_, v| !v.stopped());
}
pub fn get_instances(&self) -> Vec<Instance<I>>
where
I: Clone,
{
self.instances.read().unwrap().values().cloned().collect()
}
pub fn add_instance(&self, id: I, config: InstanceConfig)
where
I: Clone + fmt::Debug + Send + 'static + Eq + hash::Hash,
{
let mut guard = self.instances.write().unwrap();
if guard.contains_key(&id) {
return;
}
guard.insert(id.clone(), Instance::new(id, config, self.event_tx.clone()));
}
pub async fn recv(&mut self) -> Option<BotEvent<I>>
where
I: Clone + Eq + hash::Hash,
{
// We hold exactly one sender. If no other senders exist, then all
// instances are dead and we'll never receive any more events unless we
// return and allow the user to add more instances again.
while self.event_rx.sender_strong_count() > 1 {
// Prevent potential memory leak
self.purge_instances();
let Ok(event) =
tokio::time::timeout(self.config.event_timeout, self.event_rx.recv()).await
else {
// We need to re-check the sender count occasionally. It's
// possible that there are still instances that just haven't
// sent an event in a while, so we can't just return here.
continue;
};
// This only returns None if no senders remain, and since we always
// own one sender, this can't happen.
let event = event.expect("event channel should never close since we own a sender");
if let Some(instance) = self.instances.read().unwrap().get(event.id()) {
return Some(BotEvent::from_instance_event(instance.clone(), event));
}
}
None
}
}
impl<I> Default for Bot<I> {
fn default() -> Self {
Self::new()
}
}

View file

@ -13,7 +13,19 @@ pub struct ServerConfig {
pub join_attempts: usize, pub join_attempts: usize,
pub reconnect_delay: Duration, pub reconnect_delay: Duration,
pub cmd_channel_bufsize: usize, pub cmd_channel_bufsize: usize,
pub event_channel_bufsize: usize, }
impl ServerConfig {
pub fn instance(self, room: impl ToString) -> InstanceConfig {
InstanceConfig {
server: self,
room: room.to_string(),
human: false,
username: None,
force_username: false,
password: None,
}
}
} }
impl Default for ServerConfig { impl Default for ServerConfig {
@ -24,7 +36,6 @@ impl Default for ServerConfig {
join_attempts: 5, join_attempts: 5,
reconnect_delay: Duration::from_secs(30), reconnect_delay: Duration::from_secs(30),
cmd_channel_bufsize: 1, cmd_channel_bufsize: 1,
event_channel_bufsize: 10,
} }
} }
} }
@ -40,17 +51,6 @@ pub struct InstanceConfig {
} }
impl InstanceConfig { impl InstanceConfig {
pub fn new(room: impl ToString) -> Self {
Self {
server: ServerConfig::default(),
room: room.to_string(),
human: false,
username: None,
force_username: false,
password: None,
}
}
pub fn with_username(mut self, username: impl ToString) -> Self { pub fn with_username(mut self, username: impl ToString) -> Self {
self.username = Some(username.to_string()); self.username = Some(username.to_string());
self self
@ -66,3 +66,17 @@ impl InstanceConfig {
self self
} }
} }
pub struct BotConfig {
pub event_timeout: Duration,
pub event_channel_bufsize: usize,
}
impl Default for BotConfig {
fn default() -> Self {
Self {
event_timeout: Duration::from_secs(1),
event_channel_bufsize: 10,
}
}
}

View file

@ -103,6 +103,20 @@ pub enum InstanceEvent<I> {
}, },
} }
impl<I> InstanceEvent<I> {
pub fn id(&self) -> &I {
match self {
Self::Started { id } => id,
Self::Connecting { id } => id,
Self::Connected { id, .. } => id,
Self::Joined { id, .. } => id,
Self::Packet { id, .. } => id,
Self::Disconnected { id } => id,
Self::Stopped { id } => id,
}
}
}
struct InstanceTask<I> { struct InstanceTask<I> {
id: I, id: I,
config: InstanceConfig, config: InstanceConfig,
@ -342,9 +356,8 @@ impl<I: fmt::Debug> fmt::Debug for Instance<I> {
} }
impl<I: Clone + fmt::Debug + Send + 'static> Instance<I> { impl<I: Clone + fmt::Debug + Send + 'static> Instance<I> {
pub fn new(id: I, config: InstanceConfig) -> (Self, mpsc::Receiver<InstanceEvent<I>>) { pub fn new(id: I, config: InstanceConfig, event_tx: mpsc::Sender<InstanceEvent<I>>) -> Self {
let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize);
let (event_tx, event_rx) = mpsc::channel(config.server.event_channel_bufsize);
let task = InstanceTask { let task = InstanceTask {
id: id.clone(), id: id.clone(),
@ -357,7 +370,7 @@ impl<I: Clone + fmt::Debug + Send + 'static> Instance<I> {
tokio::task::spawn(task.run()); tokio::task::spawn(task.run());
(Self { id, cmd_tx }, event_rx) Self { id, cmd_tx }
} }
} }

View file

@ -1,4 +1,5 @@
mod bot;
mod config; mod config;
mod instance; mod instance;
pub use crate::{config::*, instance::*}; pub use crate::{bot::*, config::*, instance::*};