Repeatedly request logs while in a room
This commit is contained in:
parent
76412e8287
commit
ce58e5b4be
3 changed files with 120 additions and 12 deletions
|
|
@ -1,8 +1,10 @@
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
|
use parking_lot::Mutex;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tokio::{select, task, time};
|
use tokio::{select, task, time};
|
||||||
use tokio_tungstenite::tungstenite;
|
use tokio_tungstenite::tungstenite;
|
||||||
|
|
@ -10,7 +12,7 @@ use tokio_tungstenite::tungstenite;
|
||||||
use crate::ui::UiEvent;
|
use crate::ui::UiEvent;
|
||||||
use crate::vault::EuphVault;
|
use crate::vault::EuphVault;
|
||||||
|
|
||||||
use super::api::{Data, Snowflake};
|
use super::api::{Data, Log, Snowflake};
|
||||||
use super::conn::{self, ConnRx, ConnTx, Status};
|
use super::conn::{self, ConnRx, ConnTx, Status};
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
|
@ -25,6 +27,7 @@ enum Event {
|
||||||
Disconnected,
|
Disconnected,
|
||||||
Data(Data),
|
Data(Data),
|
||||||
Status(oneshot::Sender<Option<Status>>),
|
Status(oneshot::Sender<Option<Status>>),
|
||||||
|
RequestLogs,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -32,8 +35,12 @@ struct State {
|
||||||
name: String,
|
name: String,
|
||||||
vault: EuphVault,
|
vault: EuphVault,
|
||||||
ui_event_tx: mpsc::UnboundedSender<UiEvent>,
|
ui_event_tx: mpsc::UnboundedSender<UiEvent>,
|
||||||
|
|
||||||
conn_tx: Option<ConnTx>,
|
conn_tx: Option<ConnTx>,
|
||||||
last_msg_id: Option<Snowflake>,
|
/// `None` before any `snapshot-event`, then either `Some(None)` or
|
||||||
|
/// `Some(Some(id))`.
|
||||||
|
last_msg_id: Option<Option<Snowflake>>,
|
||||||
|
requesting_logs: Arc<Mutex<bool>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
|
|
@ -47,6 +54,7 @@ impl State {
|
||||||
let result = select! {
|
let result = select! {
|
||||||
_ = canary => Ok(()),
|
_ = canary => Ok(()),
|
||||||
_ = Self::reconnect(&name, &event_tx) => Ok(()),
|
_ = Self::reconnect(&name, &event_tx) => Ok(()),
|
||||||
|
_ = Self::regularly_request_logs(&event_tx) => Ok(()),
|
||||||
e = self.handle_events(&mut event_rx) => e,
|
e = self.handle_events(&mut event_rx) => e,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -87,6 +95,13 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn regularly_request_logs(event_tx: &mpsc::UnboundedSender<Event>) {
|
||||||
|
loop {
|
||||||
|
time::sleep(Duration::from_secs(10)).await; // TODO Make configurable
|
||||||
|
let _ = event_tx.send(Event::RequestLogs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_events(
|
async fn handle_events(
|
||||||
&mut self,
|
&mut self,
|
||||||
event_rx: &mut mpsc::UnboundedReceiver<Event>,
|
event_rx: &mut mpsc::UnboundedReceiver<Event>,
|
||||||
|
|
@ -100,6 +115,7 @@ impl State {
|
||||||
}
|
}
|
||||||
Event::Data(data) => self.on_data(data).await?,
|
Event::Data(data) => self.on_data(data).await?,
|
||||||
Event::Status(reply_tx) => self.on_status(reply_tx).await,
|
Event::Status(reply_tx) => self.on_status(reply_tx).await,
|
||||||
|
Event::RequestLogs => self.on_request_logs(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -140,14 +156,18 @@ impl State {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Data::SendEvent(d) => {
|
Data::SendEvent(d) => {
|
||||||
|
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||||
let id = d.0.id;
|
let id = d.0.id;
|
||||||
self.vault.add_message(d.0, self.last_msg_id);
|
self.vault.add_message(d.0, *last_msg_id);
|
||||||
self.last_msg_id = Some(id);
|
*last_msg_id = Some(id);
|
||||||
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
||||||
|
} else {
|
||||||
|
bail!("send event before snapshot event");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Data::SnapshotEvent(d) => {
|
Data::SnapshotEvent(d) => {
|
||||||
info!("e&{}: successfully joined", self.name);
|
info!("e&{}: successfully joined", self.name);
|
||||||
self.last_msg_id = d.log.last().map(|m| m.id);
|
self.last_msg_id = Some(d.log.last().map(|m| m.id));
|
||||||
self.vault.add_messages(d.log, None);
|
self.vault.add_messages(d.log, None);
|
||||||
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
||||||
}
|
}
|
||||||
|
|
@ -156,10 +176,14 @@ impl State {
|
||||||
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
||||||
}
|
}
|
||||||
Data::SendReply(d) => {
|
Data::SendReply(d) => {
|
||||||
|
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||||
let id = d.0.id;
|
let id = d.0.id;
|
||||||
self.vault.add_message(d.0, self.last_msg_id);
|
self.vault.add_message(d.0, *last_msg_id);
|
||||||
self.last_msg_id = Some(id);
|
*last_msg_id = Some(id);
|
||||||
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
let _ = self.ui_event_tx.send(UiEvent::Redraw);
|
||||||
|
} else {
|
||||||
|
bail!("send reply before snapshot event");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
@ -175,6 +199,44 @@ impl State {
|
||||||
|
|
||||||
let _ = reply_tx.send(status);
|
let _ = reply_tx.send(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn on_request_logs(&self) {
|
||||||
|
if let Some(conn_tx) = &self.conn_tx {
|
||||||
|
// Check whether logs are already being requested
|
||||||
|
let mut guard = self.requesting_logs.lock();
|
||||||
|
if *guard {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
*guard = true;
|
||||||
|
}
|
||||||
|
drop(guard);
|
||||||
|
|
||||||
|
// No logs are being requested and we've reserved our spot, so let's
|
||||||
|
// request some logs!
|
||||||
|
let vault = self.vault.clone();
|
||||||
|
let conn_tx = conn_tx.clone();
|
||||||
|
let requesting_logs = self.requesting_logs.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
let result = Self::request_logs(vault, conn_tx).await;
|
||||||
|
*requesting_logs.lock() = false;
|
||||||
|
result
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn request_logs(vault: EuphVault, conn_tx: ConnTx) -> anyhow::Result<()> {
|
||||||
|
let before = match vault.last_span().await {
|
||||||
|
Some((None, _)) => return Ok(()), // Already at top of room history
|
||||||
|
Some((Some(before), _)) => Some(before),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = conn_tx.send(Log { n: 1000, before }).await?;
|
||||||
|
// The code handling incoming events and replies also handles
|
||||||
|
// `LogReply`s, so we don't need to do anything special here.
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -199,6 +261,7 @@ impl Room {
|
||||||
ui_event_tx,
|
ui_event_tx,
|
||||||
conn_tx: None,
|
conn_tx: None,
|
||||||
last_msg_id: None,
|
last_msg_id: None,
|
||||||
|
requesting_logs: Arc::new(Mutex::new(false)),
|
||||||
};
|
};
|
||||||
|
|
||||||
task::spawn(state.run(canary_rx, event_tx.clone(), event_rx));
|
task::spawn(state.run(canary_rx, event_tx.clone(), event_rx));
|
||||||
|
|
@ -216,4 +279,10 @@ impl Room {
|
||||||
.map_err(|_| Error::Stopped)?;
|
.map_err(|_| Error::Stopped)?;
|
||||||
rx.await.map_err(|_| Error::Stopped)
|
rx.await.map_err(|_| Error::Stopped)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn request_logs(&self) -> Result<(), Error> {
|
||||||
|
self.event_tx
|
||||||
|
.send(Event::RequestLogs)
|
||||||
|
.map_err(|_| Error::Stopped)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,17 @@ impl EuphVault {
|
||||||
};
|
};
|
||||||
let _ = self.tx.send(request.into());
|
let _ = self.tx.send(request.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn last_span(&self) -> Option<(Option<Snowflake>, Option<Snowflake>)> {
|
||||||
|
// TODO vault::Error
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let request = EuphRequest::LastSpan {
|
||||||
|
room: self.room.clone(),
|
||||||
|
result: tx,
|
||||||
|
};
|
||||||
|
let _ = self.tx.send(request.into());
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|
@ -186,6 +197,10 @@ pub(super) enum EuphRequest {
|
||||||
msgs: Vec<Message>,
|
msgs: Vec<Message>,
|
||||||
next_msg: Option<Snowflake>,
|
next_msg: Option<Snowflake>,
|
||||||
},
|
},
|
||||||
|
LastSpan {
|
||||||
|
room: String,
|
||||||
|
result: oneshot::Sender<Option<(Option<Snowflake>, Option<Snowflake>)>>,
|
||||||
|
},
|
||||||
Path {
|
Path {
|
||||||
room: String,
|
room: String,
|
||||||
id: Snowflake,
|
id: Snowflake,
|
||||||
|
|
@ -229,6 +244,7 @@ impl EuphRequest {
|
||||||
msgs,
|
msgs,
|
||||||
next_msg,
|
next_msg,
|
||||||
} => Self::add_msgs(conn, room, msgs, next_msg),
|
} => Self::add_msgs(conn, room, msgs, next_msg),
|
||||||
|
EuphRequest::LastSpan { room, result } => Self::last_span(conn, room, result),
|
||||||
EuphRequest::Path { room, id, result } => Self::path(conn, room, id, result),
|
EuphRequest::Path { room, id, result } => Self::path(conn, room, id, result),
|
||||||
EuphRequest::Tree { room, root, result } => Self::tree(conn, room, root, result),
|
EuphRequest::Tree { room, root, result } => Self::tree(conn, room, root, result),
|
||||||
EuphRequest::PrevTree { room, root, result } => {
|
EuphRequest::PrevTree { room, root, result } => {
|
||||||
|
|
@ -402,6 +418,27 @@ impl EuphRequest {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn last_span(
|
||||||
|
conn: &Connection,
|
||||||
|
room: String,
|
||||||
|
result: oneshot::Sender<Option<(Option<Snowflake>, Option<Snowflake>)>>,
|
||||||
|
) -> rusqlite::Result<()> {
|
||||||
|
let span = conn
|
||||||
|
.prepare(
|
||||||
|
"
|
||||||
|
SELECT start, end
|
||||||
|
FROM euph_spans
|
||||||
|
WHERE room = ?
|
||||||
|
ORDER BY start, end DESC
|
||||||
|
LIMIT 1
|
||||||
|
",
|
||||||
|
)?
|
||||||
|
.query_row([room], |row| Ok((row.get(0)?, row.get(1)?)))
|
||||||
|
.optional()?;
|
||||||
|
let _ = result.send(span);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn path(
|
fn path(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
room: String,
|
room: String,
|
||||||
|
|
|
||||||
|
|
@ -55,7 +55,9 @@ fn m1(tx: &mut Transaction) -> rusqlite::Result<()> {
|
||||||
|
|
||||||
PRIMARY KEY (room, start, end),
|
PRIMARY KEY (room, start, end),
|
||||||
FOREIGN KEY (room, start) REFERENCES euph_msgs (room, id),
|
FOREIGN KEY (room, start) REFERENCES euph_msgs (room, id),
|
||||||
FOREIGN KEY (room, end) REFERENCES euph_msgs (room, id)
|
FOREIGN KEY (room, end) REFERENCES euph_msgs (room, id),
|
||||||
|
|
||||||
|
CHECK (start IS NULL OR end IS NOT NULL)
|
||||||
) STRICT;
|
) STRICT;
|
||||||
|
|
||||||
CREATE VIEW euph_trees (room, id) AS
|
CREATE VIEW euph_trees (room, id) AS
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue