From ce58e5b4be937503d50b363b3319097f70b92ab3 Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 24 Jun 2022 17:25:35 +0200 Subject: [PATCH] Repeatedly request logs while in a room --- src/euph/room.rs | 91 ++++++++++++++++++++++++++++++++++++++------ src/vault/euph.rs | 37 ++++++++++++++++++ src/vault/migrate.rs | 4 +- 3 files changed, 120 insertions(+), 12 deletions(-) diff --git a/src/euph/room.rs b/src/euph/room.rs index 9ea9843..50a0a9f 100644 --- a/src/euph/room.rs +++ b/src/euph/room.rs @@ -1,8 +1,10 @@ use std::convert::Infallible; +use std::sync::Arc; use std::time::Duration; use anyhow::bail; use log::{error, info, warn}; +use parking_lot::Mutex; use tokio::sync::{mpsc, oneshot}; use tokio::{select, task, time}; use tokio_tungstenite::tungstenite; @@ -10,7 +12,7 @@ use tokio_tungstenite::tungstenite; use crate::ui::UiEvent; use crate::vault::EuphVault; -use super::api::{Data, Snowflake}; +use super::api::{Data, Log, Snowflake}; use super::conn::{self, ConnRx, ConnTx, Status}; #[derive(Debug, thiserror::Error)] @@ -25,6 +27,7 @@ enum Event { Disconnected, Data(Data), Status(oneshot::Sender>), + RequestLogs, } #[derive(Debug)] @@ -32,8 +35,12 @@ struct State { name: String, vault: EuphVault, ui_event_tx: mpsc::UnboundedSender, + conn_tx: Option, - last_msg_id: Option, + /// `None` before any `snapshot-event`, then either `Some(None)` or + /// `Some(Some(id))`. + last_msg_id: Option>, + requesting_logs: Arc>, } impl State { @@ -47,6 +54,7 @@ impl State { let result = select! { _ = canary => Ok(()), _ = Self::reconnect(&name, &event_tx) => Ok(()), + _ = Self::regularly_request_logs(&event_tx) => Ok(()), e = self.handle_events(&mut event_rx) => e, }; @@ -87,6 +95,13 @@ impl State { } } + async fn regularly_request_logs(event_tx: &mpsc::UnboundedSender) { + loop { + time::sleep(Duration::from_secs(10)).await; // TODO Make configurable + let _ = event_tx.send(Event::RequestLogs); + } + } + async fn handle_events( &mut self, event_rx: &mut mpsc::UnboundedReceiver, @@ -100,6 +115,7 @@ impl State { } Event::Data(data) => self.on_data(data).await?, Event::Status(reply_tx) => self.on_status(reply_tx).await, + Event::RequestLogs => self.on_request_logs(), } } Ok(()) @@ -140,14 +156,18 @@ impl State { ); } Data::SendEvent(d) => { - let id = d.0.id; - self.vault.add_message(d.0, self.last_msg_id); - self.last_msg_id = Some(id); - let _ = self.ui_event_tx.send(UiEvent::Redraw); + if let Some(last_msg_id) = &mut self.last_msg_id { + let id = d.0.id; + self.vault.add_message(d.0, *last_msg_id); + *last_msg_id = Some(id); + let _ = self.ui_event_tx.send(UiEvent::Redraw); + } else { + bail!("send event before snapshot event"); + } } Data::SnapshotEvent(d) => { info!("e&{}: successfully joined", self.name); - self.last_msg_id = d.log.last().map(|m| m.id); + self.last_msg_id = Some(d.log.last().map(|m| m.id)); self.vault.add_messages(d.log, None); let _ = self.ui_event_tx.send(UiEvent::Redraw); } @@ -156,10 +176,14 @@ impl State { let _ = self.ui_event_tx.send(UiEvent::Redraw); } Data::SendReply(d) => { - let id = d.0.id; - self.vault.add_message(d.0, self.last_msg_id); - self.last_msg_id = Some(id); - let _ = self.ui_event_tx.send(UiEvent::Redraw); + if let Some(last_msg_id) = &mut self.last_msg_id { + let id = d.0.id; + self.vault.add_message(d.0, *last_msg_id); + *last_msg_id = Some(id); + let _ = self.ui_event_tx.send(UiEvent::Redraw); + } else { + bail!("send reply before snapshot event"); + } } _ => {} } @@ -175,6 +199,44 @@ impl State { let _ = reply_tx.send(status); } + + fn on_request_logs(&self) { + if let Some(conn_tx) = &self.conn_tx { + // Check whether logs are already being requested + let mut guard = self.requesting_logs.lock(); + if *guard { + return; + } else { + *guard = true; + } + drop(guard); + + // No logs are being requested and we've reserved our spot, so let's + // request some logs! + let vault = self.vault.clone(); + let conn_tx = conn_tx.clone(); + let requesting_logs = self.requesting_logs.clone(); + task::spawn(async move { + let result = Self::request_logs(vault, conn_tx).await; + *requesting_logs.lock() = false; + result + }); + } + } + + async fn request_logs(vault: EuphVault, conn_tx: ConnTx) -> anyhow::Result<()> { + let before = match vault.last_span().await { + Some((None, _)) => return Ok(()), // Already at top of room history + Some((Some(before), _)) => Some(before), + None => None, + }; + + let _ = conn_tx.send(Log { n: 1000, before }).await?; + // The code handling incoming events and replies also handles + // `LogReply`s, so we don't need to do anything special here. + + Ok(()) + } } #[derive(Debug)] @@ -199,6 +261,7 @@ impl Room { ui_event_tx, conn_tx: None, last_msg_id: None, + requesting_logs: Arc::new(Mutex::new(false)), }; task::spawn(state.run(canary_rx, event_tx.clone(), event_rx)); @@ -216,4 +279,10 @@ impl Room { .map_err(|_| Error::Stopped)?; rx.await.map_err(|_| Error::Stopped) } + + pub fn request_logs(&self) -> Result<(), Error> { + self.event_tx + .send(Event::RequestLogs) + .map_err(|_| Error::Stopped) + } } diff --git a/src/vault/euph.rs b/src/vault/euph.rs index 80a78a7..e8b2977 100644 --- a/src/vault/euph.rs +++ b/src/vault/euph.rs @@ -100,6 +100,17 @@ impl EuphVault { }; let _ = self.tx.send(request.into()); } + + pub async fn last_span(&self) -> Option<(Option, Option)> { + // TODO vault::Error + let (tx, rx) = oneshot::channel(); + let request = EuphRequest::LastSpan { + room: self.room.clone(), + result: tx, + }; + let _ = self.tx.send(request.into()); + rx.await.unwrap() + } } #[async_trait] @@ -186,6 +197,10 @@ pub(super) enum EuphRequest { msgs: Vec, next_msg: Option, }, + LastSpan { + room: String, + result: oneshot::Sender, Option)>>, + }, Path { room: String, id: Snowflake, @@ -229,6 +244,7 @@ impl EuphRequest { msgs, next_msg, } => Self::add_msgs(conn, room, msgs, next_msg), + EuphRequest::LastSpan { room, result } => Self::last_span(conn, room, result), EuphRequest::Path { room, id, result } => Self::path(conn, room, id, result), EuphRequest::Tree { room, root, result } => Self::tree(conn, room, root, result), EuphRequest::PrevTree { room, root, result } => { @@ -402,6 +418,27 @@ impl EuphRequest { Ok(()) } + fn last_span( + conn: &Connection, + room: String, + result: oneshot::Sender, Option)>>, + ) -> rusqlite::Result<()> { + let span = conn + .prepare( + " + SELECT start, end + FROM euph_spans + WHERE room = ? + ORDER BY start, end DESC + LIMIT 1 + ", + )? + .query_row([room], |row| Ok((row.get(0)?, row.get(1)?))) + .optional()?; + let _ = result.send(span); + Ok(()) + } + fn path( conn: &Connection, room: String, diff --git a/src/vault/migrate.rs b/src/vault/migrate.rs index 9313fa5..1968364 100644 --- a/src/vault/migrate.rs +++ b/src/vault/migrate.rs @@ -55,7 +55,9 @@ fn m1(tx: &mut Transaction) -> rusqlite::Result<()> { PRIMARY KEY (room, start, end), FOREIGN KEY (room, start) REFERENCES euph_msgs (room, id), - FOREIGN KEY (room, end) REFERENCES euph_msgs (room, id) + FOREIGN KEY (room, end) REFERENCES euph_msgs (room, id), + + CHECK (start IS NULL OR end IS NOT NULL) ) STRICT; CREATE VIEW euph_trees (room, id) AS