From 8377695529eb34db2da9df1a38fe2d093c37b873 Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 27 Dec 2024 14:42:35 +0100 Subject: [PATCH] Remove instance id generics Instead, all instance ids are now usize (like message ids). This allows me to enforce the fact that no two instances of a Bot must have the same id by generating the ids in the Bot. Reusing the same id for multiple instances that send their events to the same place can lead to race conditions depending on how events are handled. For example, the old instance might still be shutting down while the new instance is already connected to a room, leading to an InstanceEvent::Stopped from the old instance that seemingly applies to the new instance. --- euphoxide-bot/examples/examplebot.rs | 2 +- euphoxide-bot/examples/examplebot_instance.rs | 2 +- euphoxide-bot/src/bot.rs | 65 +++++++-------- euphoxide-bot/src/instance.rs | 82 ++++++++----------- 4 files changed, 68 insertions(+), 83 deletions(-) diff --git a/euphoxide-bot/examples/examplebot.rs b/euphoxide-bot/examples/examplebot.rs index bdc4864..292993e 100644 --- a/euphoxide-bot/examples/examplebot.rs +++ b/euphoxide-bot/examples/examplebot.rs @@ -77,7 +77,7 @@ async fn run() -> anyhow::Result<()> { .instance("test") .with_username("examplebot"); - bot.add_instance((), config); + bot.add_instance(config); while let Some(event) = bot.recv().await { if let BotEvent::Packet { conn, packet, .. } = event { diff --git a/euphoxide-bot/examples/examplebot_instance.rs b/euphoxide-bot/examples/examplebot_instance.rs index bf58734..59a52a6 100644 --- a/euphoxide-bot/examples/examplebot_instance.rs +++ b/euphoxide-bot/examples/examplebot_instance.rs @@ -77,7 +77,7 @@ async fn run() -> anyhow::Result<()> { .with_username("examplebot"); let (event_tx, mut event_rx) = mpsc::channel(10); - let _instance = Instance::new((), config, event_tx); // Don't drop or instance stops + let _instance = Instance::new(0, config, event_tx); // Don't drop or instance stops while let Some(event) = event_rx.recv().await { if let InstanceEvent::Packet { conn, packet, .. } = event { diff --git a/euphoxide-bot/src/bot.rs b/euphoxide-bot/src/bot.rs index 67bc8d6..aba1ec9 100644 --- a/euphoxide-bot/src/bot.rs +++ b/euphoxide-bot/src/bot.rs @@ -1,6 +1,5 @@ use std::{ collections::HashMap, - fmt, hash, sync::{Arc, RwLock}, }; @@ -13,39 +12,39 @@ use tokio::sync::mpsc; use crate::{BotConfig, Instance, InstanceConfig, InstanceEvent}; #[derive(Debug)] -pub enum BotEvent { +pub enum BotEvent { Started { - instance: Instance, + instance: Instance, }, Connecting { - instance: Instance, + instance: Instance, }, Connected { - instance: Instance, + instance: Instance, conn: ClientConnHandle, state: State, }, Joined { - instance: Instance, + instance: Instance, conn: ClientConnHandle, state: State, }, Packet { - instance: Instance, + instance: Instance, conn: ClientConnHandle, state: State, packet: ParsedPacket, }, Disconnected { - instance: Instance, + instance: Instance, }, Stopped { - instance: Instance, + instance: Instance, }, } -impl BotEvent { - fn from_instance_event(instance: Instance, event: InstanceEvent) -> Self { +impl BotEvent { + fn from_instance_event(instance: Instance, event: InstanceEvent) -> Self { match event { InstanceEvent::Started { id: _ } => Self::Started { instance }, InstanceEvent::Connecting { id: _ } => Self::Connecting { instance }, @@ -75,7 +74,7 @@ impl BotEvent { } } - pub fn instance(&self) -> &Instance { + pub fn instance(&self) -> &Instance { match self { Self::Started { instance } => instance, Self::Connecting { instance, .. } => instance, @@ -88,14 +87,15 @@ impl BotEvent { } } -pub struct Bot { +pub struct Bot { config: BotConfig, - instances: Arc>>>, - event_tx: mpsc::Sender>, - event_rx: mpsc::Receiver>, + next_id: usize, + instances: Arc>>, + event_tx: mpsc::Sender, + event_rx: mpsc::Receiver, } -impl Bot { +impl Bot { pub fn new() -> Self { Self::new_with_config(BotConfig::default()) } @@ -104,6 +104,7 @@ impl Bot { let (event_tx, event_rx) = mpsc::channel(10); Self { config, + next_id: 0, instances: Arc::new(RwLock::new(HashMap::new())), event_tx, event_rx, @@ -115,30 +116,24 @@ impl Bot { guard.retain(|_, v| !v.stopped()); } - pub fn get_instances(&self) -> Vec> - where - I: Clone, - { + pub fn get_instances(&self) -> Vec { self.instances.read().unwrap().values().cloned().collect() } - pub fn add_instance(&self, id: I, config: InstanceConfig) - where - I: Clone + fmt::Debug + Send + 'static + Eq + hash::Hash, - { + pub fn add_instance(&mut self, config: InstanceConfig) -> Instance { + let id = self.next_id; + self.next_id += 1; + let mut guard = self.instances.write().unwrap(); + assert!(!guard.contains_key(&id)); - if guard.contains_key(&id) { - return; - } + let instance = Instance::new(id, config, self.event_tx.clone()); + guard.insert(id, instance.clone()); - guard.insert(id.clone(), Instance::new(id, config, self.event_tx.clone())); + instance } - pub async fn recv(&mut self) -> Option> - where - I: Clone + Eq + hash::Hash, - { + pub async fn recv(&mut self) -> Option { // We hold exactly one sender. If no other senders exist, then all // instances are dead and we'll never receive any more events unless we // return and allow the user to add more instances again. @@ -159,7 +154,7 @@ impl Bot { // own one sender, this can't happen. let event = event.expect("event channel should never close since we own a sender"); - if let Some(instance) = self.instances.read().unwrap().get(event.id()) { + if let Some(instance) = self.instances.read().unwrap().get(&event.id()) { return Some(BotEvent::from_instance_event(instance.clone(), event)); } } @@ -168,7 +163,7 @@ impl Bot { } } -impl Default for Bot { +impl Default for Bot { fn default() -> Self { Self::new() } diff --git a/euphoxide-bot/src/instance.rs b/euphoxide-bot/src/instance.rs index 962df18..a82ad57 100644 --- a/euphoxide-bot/src/instance.rs +++ b/euphoxide-bot/src/instance.rs @@ -72,63 +72,63 @@ enum Command { } #[derive(Debug)] -pub enum InstanceEvent { +pub enum InstanceEvent { Started { - id: I, + id: usize, }, Connecting { - id: I, + id: usize, }, Connected { - id: I, + id: usize, conn: ClientConnHandle, state: State, }, Joined { - id: I, + id: usize, conn: ClientConnHandle, state: State, }, Packet { - id: I, + id: usize, conn: ClientConnHandle, state: State, packet: ParsedPacket, }, Disconnected { - id: I, + id: usize, }, Stopped { - id: I, + id: usize, }, } -impl InstanceEvent { - pub fn id(&self) -> &I { +impl InstanceEvent { + pub fn id(&self) -> usize { match self { - Self::Started { id } => id, - Self::Connecting { id } => id, - Self::Connected { id, .. } => id, - Self::Joined { id, .. } => id, - Self::Packet { id, .. } => id, - Self::Disconnected { id } => id, - Self::Stopped { id } => id, + Self::Started { id } => *id, + Self::Connecting { id } => *id, + Self::Connected { id, .. } => *id, + Self::Joined { id, .. } => *id, + Self::Packet { id, .. } => *id, + Self::Disconnected { id } => *id, + Self::Stopped { id } => *id, } } } -struct InstanceTask { - id: I, +struct InstanceTask { + id: usize, config: InstanceConfig, cmd_rx: mpsc::Receiver, - event_tx: mpsc::Sender>, + event_tx: mpsc::Sender, attempts: usize, never_joined: bool, } -impl InstanceTask { +impl InstanceTask { fn get_cookies(&self) -> Option { self.config .server @@ -173,7 +173,7 @@ impl InstanceTask { let _ = self .event_tx .send(InstanceEvent::Joined { - id: self.id.clone(), + id: self.id, conn: conn.handle(), state: conn.state().clone(), }) @@ -184,7 +184,7 @@ impl InstanceTask { let _ = self .event_tx .send(InstanceEvent::Packet { - id: self.id.clone(), + id: self.id, conn: conn.handle(), state: conn.state().clone(), packet: packet.clone(), @@ -261,9 +261,7 @@ impl InstanceTask { let _ = self .event_tx - .send(InstanceEvent::Connecting { - id: self.id.clone(), - }) + .send(InstanceEvent::Connecting { id: self.id }) .await; let mut conn = match self.connect().await { @@ -283,7 +281,7 @@ impl InstanceTask { let _ = self .event_tx .send(InstanceEvent::Connected { - id: self.id.clone(), + id: self.id, conn: conn.handle(), state: conn.state().clone(), }) @@ -307,9 +305,7 @@ impl InstanceTask { let _ = self .event_tx - .send(InstanceEvent::Disconnected { - id: self.id.clone(), - }) + .send(InstanceEvent::Disconnected { id: self.id }) .await; result @@ -318,9 +314,7 @@ impl InstanceTask { async fn run(mut self) { let _ = self .event_tx - .send(InstanceEvent::Started { - id: self.id.clone(), - }) + .send(InstanceEvent::Started { id: self.id }) .await; loop { @@ -334,20 +328,18 @@ impl InstanceTask { let _ = self .event_tx - .send(InstanceEvent::Stopped { - id: self.id.clone(), - }) + .send(InstanceEvent::Stopped { id: self.id }) .await; } } #[derive(Clone)] -pub struct Instance { - id: I, +pub struct Instance { + id: usize, cmd_tx: mpsc::Sender, } -impl fmt::Debug for Instance { +impl fmt::Debug for Instance { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Instance") .field("id", &self.id) @@ -355,12 +347,12 @@ impl fmt::Debug for Instance { } } -impl Instance { - pub fn new(id: I, config: InstanceConfig, event_tx: mpsc::Sender>) -> Self { +impl Instance { + pub fn new(id: usize, config: InstanceConfig, event_tx: mpsc::Sender) -> Self { let (cmd_tx, cmd_rx) = mpsc::channel(config.server.cmd_channel_bufsize); let task = InstanceTask { - id: id.clone(), + id, config, attempts: 0, never_joined: false, @@ -372,11 +364,9 @@ impl Instance { Self { id, cmd_tx } } -} -impl Instance { - pub fn id(&self) -> &I { - &self.id + pub fn id(&self) -> usize { + self.id } pub fn stopped(&self) -> bool {