From 57351f65be009dfdb90595c96b6324cbbcf3506b Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 24 Jun 2022 15:33:45 +0200 Subject: [PATCH] Update vault on send events and replies --- src/euph/conn.rs | 45 ++++++++++++++--------- src/euph/room.rs | 25 +++++++++++-- src/vault/euph.rs | 94 +++++++++++++++++++++++++++-------------------- 3 files changed, 102 insertions(+), 62 deletions(-) diff --git a/src/euph/conn.rs b/src/euph/conn.rs index 213b3a6..7465ef0 100644 --- a/src/euph/conn.rs +++ b/src/euph/conn.rs @@ -69,13 +69,21 @@ pub struct Joining { } impl Joining { - fn on_data(&mut self, data: Data) { + fn on_data(&mut self, data: &Data) -> anyhow::Result<()> { match data { - Data::BounceEvent(p) => self.bounce = Some(p), - Data::HelloEvent(p) => self.hello = Some(p), - Data::SnapshotEvent(p) => self.snapshot = Some(p), + Data::BounceEvent(p) => self.bounce = Some(p.clone()), + Data::HelloEvent(p) => self.hello = Some(p.clone()), + Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), + d @ (Data::JoinEvent(_) + | Data::NetworkEvent(_) + | Data::NickEvent(_) + | Data::EditMessageEvent(_) + | Data::PartEvent(_) + | Data::PmInitiateEvent(_) + | Data::SendEvent(_)) => bail!("unexpected {}", d.packet_type()), _ => {} } + Ok(()) } fn joined(&self) -> Option { @@ -105,13 +113,14 @@ pub struct Joined { } impl Joined { - fn on_data(&mut self, data: Data) { + fn on_data(&mut self, data: &Data) { match data { Data::JoinEvent(p) => { - self.listing.insert(p.0.id.clone(), p.0); + self.listing.insert(p.0.id.clone(), p.0.clone()); } Data::SendEvent(p) => { - self.listing.insert(p.0.sender.id.clone(), p.0.sender); + self.listing + .insert(p.0.sender.id.clone(), p.0.sender.clone()); } Data::PartEvent(p) => { self.listing.remove(&p.0.id); @@ -125,12 +134,12 @@ impl Joined { } Data::NickEvent(p) => { if let Some(session) = self.listing.get_mut(&p.id) { - session.name = p.to; + session.name = p.to.clone(); } } Data::NickReply(p) => { assert_eq!(self.session.id, p.id); - self.session.name = p.to; + self.session.name = p.to.clone(); } // The who reply is broken and can't be trusted right now, so we'll // not even look at it. @@ -253,13 +262,6 @@ impl State { self.replies.complete(id, packet.content.clone()); } - // Shovel events and successful replies into self.packet_tx. Assumes - // that no even ever errors and that erroring replies are not - // interesting. - if let Ok(data) = &packet.content { - self.packet_tx.send(data.clone())?; - } - // Play a game of table tennis match &packet.content { Ok(Data::PingReply(p)) => self.last_euph_pong = p.time, @@ -272,10 +274,10 @@ impl State { } // Update internal state - if let Ok(data) = packet.content { + if let Ok(data) = &packet.content { match &mut self.status { Status::Joining(joining) => { - joining.on_data(data); + joining.on_data(data)?; if let Some(joined) = joining.joined() { self.status = Status::Joined(joined); } @@ -284,6 +286,13 @@ impl State { } } + // Shovel events and successful replies into self.packet_tx. Assumes + // that no even ever errors and that erroring replies are not + // interesting. + if let Ok(data) = packet.content { + self.packet_tx.send(data)?; + } + Ok(()) } diff --git a/src/euph/room.rs b/src/euph/room.rs index 6443db0..4c27caf 100644 --- a/src/euph/room.rs +++ b/src/euph/room.rs @@ -10,7 +10,7 @@ use tokio_tungstenite::tungstenite; use crate::ui::UiEvent; use crate::vault::EuphVault; -use super::api::Data; +use super::api::{Data, Snowflake}; use super::conn::{self, ConnRx, ConnTx, Status}; #[derive(Debug, thiserror::Error)] @@ -33,6 +33,7 @@ struct State { vault: EuphVault, ui_event_tx: mpsc::UnboundedSender, conn_tx: Option, + last_msg_id: Option, } impl State { @@ -93,7 +94,10 @@ impl State { while let Some(event) = event_rx.recv().await { match event { Event::Connected(conn_tx) => self.conn_tx = Some(conn_tx), - Event::Disconnected => self.conn_tx = None, + Event::Disconnected => { + self.conn_tx = None; + self.last_msg_id = None; + } Event::Data(data) => self.on_data(data).await?, Event::Status(reply_tx) => self.on_status(reply_tx).await, } @@ -101,7 +105,7 @@ impl State { Ok(()) } - async fn on_data(&self, data: Data) -> anyhow::Result<()> { + async fn on_data(&mut self, data: Data) -> anyhow::Result<()> { match data { Data::BounceEvent(_) => { error!("e&{}: auth not implemented", self.name); @@ -135,9 +139,15 @@ impl State { self.name, d.from_nick, d.from_room ); } - Data::SendEvent(_) => {} + Data::SendEvent(d) => { + let id = d.0.id; + self.vault.add_message(d.0, self.last_msg_id); + self.last_msg_id = Some(id); + let _ = self.ui_event_tx.send(UiEvent::Redraw); + } Data::SnapshotEvent(d) => { info!("e&{}: successfully joined", self.name); + self.last_msg_id = d.log.last().map(|m| m.id); self.vault.add_messages(d.log, None); let _ = self.ui_event_tx.send(UiEvent::Redraw); } @@ -145,6 +155,12 @@ impl State { self.vault.add_messages(d.log, d.before); let _ = self.ui_event_tx.send(UiEvent::Redraw); } + Data::SendReply(d) => { + let id = d.0.id; + self.vault.add_message(d.0, self.last_msg_id); + self.last_msg_id = Some(id); + let _ = self.ui_event_tx.send(UiEvent::Redraw); + } _ => {} } Ok(()) @@ -181,6 +197,7 @@ impl Room { vault, ui_event_tx, conn_tx: None, + last_msg_id: None, }; task::spawn(state.run(canary_rx, event_tx.clone(), event_rx)); diff --git a/src/vault/euph.rs b/src/vault/euph.rs index 6d7588e..80a78a7 100644 --- a/src/vault/euph.rs +++ b/src/vault/euph.rs @@ -249,11 +249,50 @@ impl EuphRequest { } } + fn insert_msgs(tx: &Transaction, room: &str, msgs: Vec) -> rusqlite::Result<()> { + let mut stmt = tx.prepare(" + INSERT OR REPLACE INTO euph_msgs ( + room, id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated, + user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ")?; + + for msg in msgs { + stmt.execute(params![ + room, + msg.id, + msg.parent, + msg.previous_edit_id, + msg.time, + msg.content, + msg.encryption_key_id, + msg.edited, + msg.deleted, + msg.truncated, + msg.sender.id.0, + msg.sender.name, + msg.sender.server_id, + msg.sender.server_era, + msg.sender.session_id, + msg.sender.is_staff, + msg.sender.is_manager, + msg.sender.client_address, + msg.sender.real_client_address, + ])?; + } + + Ok(()) + } + fn add_span( tx: &Transaction, room: &str, - first_msg_id: Option, - last_msg_id: Option, + start: Option, + end: Option, ) -> rusqlite::Result<()> { // Retrieve all spans for the room let mut spans = tx @@ -272,7 +311,7 @@ impl EuphRequest { .collect::, _>>()?; // Add new span and sort spans lexicographically - spans.push((first_msg_id, last_msg_id)); + spans.push((start, end)); spans.sort_unstable(); // Combine overlapping spans (including newly added span) @@ -324,12 +363,19 @@ impl EuphRequest { } fn add_msg( - conn: &Connection, + conn: &mut Connection, room: String, msg: Message, prev_msg: Option, ) -> rusqlite::Result<()> { - todo!() + let tx = conn.transaction()?; + + let end = msg.id; + Self::insert_msgs(&tx, &room, vec![msg])?; + Self::add_span(&tx, &room, prev_msg, Some(end))?; + + tx.commit()?; + Ok(()) } fn add_msgs( @@ -346,42 +392,10 @@ impl EuphRequest { let first_msg_id = msgs.first().unwrap().id; let last_msg_id = msgs.last().unwrap().id; - let mut stmt = tx.prepare(" - INSERT OR REPLACE INTO euph_msgs ( - room, id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated, - user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address - ) - VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ?, ?, ?, ?, ?, ?, ? - ) - ")?; - for msg in msgs { - stmt.execute(params![ - room, - msg.id, - msg.parent, - msg.previous_edit_id, - msg.time, - msg.content, - msg.encryption_key_id, - msg.edited, - msg.deleted, - msg.truncated, - msg.sender.id.0, - msg.sender.name, - msg.sender.server_id, - msg.sender.server_era, - msg.sender.session_id, - msg.sender.is_staff, - msg.sender.is_manager, - msg.sender.client_address, - msg.sender.real_client_address, - ])?; - } + Self::insert_msgs(&tx, &room, msgs)?; - let last_msg_id = next_msg_id.unwrap_or(last_msg_id); - Self::add_span(&tx, &room, Some(first_msg_id), Some(last_msg_id))?; + let end = next_msg_id.unwrap_or(last_msg_id); + Self::add_span(&tx, &room, Some(first_msg_id), Some(end))?; } tx.commit()?;