From d6821881f352711a5fa61fe3e3c2ff5cf404aedd Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 24 Jun 2022 00:20:14 +0200 Subject: [PATCH] Implement inserting euph messages into vault --- src/euph.rs | 3 +- src/vault.rs | 4 +- src/vault/euph.rs | 231 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 223 insertions(+), 15 deletions(-) diff --git a/src/euph.rs b/src/euph.rs index 2e424d7..105916d 100644 --- a/src/euph.rs +++ b/src/euph.rs @@ -1,6 +1,5 @@ -mod api; +pub mod api; mod conn; mod room; -pub use api::Snowflake; pub use room::Room; diff --git a/src/vault.rs b/src/vault.rs index 241c12a..c4667fe 100644 --- a/src/vault.rs +++ b/src/vault.rs @@ -35,7 +35,7 @@ impl Vault { } } -fn run(conn: Connection, mut rx: mpsc::UnboundedReceiver) { +fn run(mut conn: Connection, mut rx: mpsc::UnboundedReceiver) { while let Some(request) = rx.blocking_recv() { match request { Request::Close(tx) => { @@ -47,7 +47,7 @@ fn run(conn: Connection, mut rx: mpsc::UnboundedReceiver) { drop(tx); break; } - Request::Euph(r) => r.perform(&conn), + Request::Euph(r) => r.perform(&mut conn), } } } diff --git a/src/vault/euph.rs b/src/vault/euph.rs index a483d15..a2dd7b5 100644 --- a/src/vault/euph.rs +++ b/src/vault/euph.rs @@ -1,13 +1,42 @@ +use std::mem; + use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use rusqlite::{params, Connection, OptionalExtension}; +use chrono::{DateTime, TimeZone, Utc}; +use rusqlite::types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef}; +use rusqlite::{params, Connection, OptionalExtension, ToSql, Transaction}; use tokio::sync::{mpsc, oneshot}; -use crate::euph::Snowflake; +use crate::euph::api::{Message, Snowflake, Time}; use crate::store::{Msg, MsgStore, Path, Tree}; use super::Request; +impl ToSql for Snowflake { + fn to_sql(&self) -> rusqlite::Result> { + self.0.to_sql() + } +} + +impl FromSql for Snowflake { + fn column_result(value: ValueRef<'_>) -> Result { + u64::column_result(value).map(Self) + } +} + +impl ToSql for Time { + fn to_sql(&self) -> rusqlite::Result> { + let timestamp = self.0.timestamp(); + Ok(ToSqlOutput::Owned(Value::Integer(timestamp))) + } +} + +impl FromSql for Time { + fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult { + let timestamp = i64::column_result(value)?; + Ok(Self(Utc.timestamp(timestamp, 0))) + } +} + #[derive(Debug, Clone)] pub struct EuphMsg { id: Snowflake, @@ -47,11 +76,32 @@ impl From for Request { } } +#[derive(Debug, Clone)] pub struct EuphVault { pub(super) tx: mpsc::UnboundedSender, pub(super) room: String, } +impl EuphVault { + pub fn add_message(&self, msg: Message, prev_msg: Option) { + let request = EuphRequest::AddMsg { + room: self.room.clone(), + msg, + prev_msg, + }; + let _ = self.tx.send(request.into()); + } + + pub fn add_messages(&self, msgs: Vec, next_msg: Option) { + let request = EuphRequest::AddMsgs { + room: self.room.clone(), + msgs, + next_msg, + }; + let _ = self.tx.send(request.into()); + } +} + #[async_trait] impl MsgStore for EuphVault { async fn path(&self, id: &Snowflake) -> Path { @@ -126,6 +176,16 @@ impl MsgStore for EuphVault { } pub(super) enum EuphRequest { + AddMsg { + room: String, + msg: Message, + prev_msg: Option, + }, + AddMsgs { + room: String, + msgs: Vec, + next_msg: Option, + }, Path { room: String, id: Snowflake, @@ -157,8 +217,18 @@ pub(super) enum EuphRequest { } impl EuphRequest { - pub(super) fn perform(self, conn: &Connection) { + pub(super) fn perform(self, conn: &mut Connection) { let result = match self { + EuphRequest::AddMsg { + room, + msg, + prev_msg, + } => Self::add_msg(conn, room, msg, prev_msg), + EuphRequest::AddMsgs { + room, + msgs, + next_msg, + } => Self::add_msgs(conn, room, msgs, next_msg), EuphRequest::Path { room, id, result } => Self::path(conn, room, id, result), EuphRequest::Tree { room, root, result } => Self::tree(conn, room, root, result), EuphRequest::PrevTree { room, root, result } => { @@ -179,6 +249,145 @@ impl EuphRequest { } } + fn add_span( + tx: &Transaction, + room: &str, + first_msg_id: Option, + last_msg_id: Option, + ) -> rusqlite::Result<()> { + // Retrieve all spans for the room + let mut spans = tx + .prepare( + " + SELECT start, end + FROM euph_spans + WHERE room = ? + ", + )? + .query_map([room], |row| { + let start = row.get::<_, Option>(0)?; + let end = row.get::<_, Option>(1)?; + Ok((start, end)) + })? + .collect::, _>>()?; + + // Add new span and sort spans lexicographically + spans.push((first_msg_id, last_msg_id)); + spans.sort_unstable(); + + // Combine overlapping spans (including newly added span) + let mut cur_span: Option<(Option, Option)> = None; + let mut result = vec![]; + for mut span in spans { + if let Some(cur_span) = &mut cur_span { + if span.0 <= cur_span.1 { + // Since spans are sorted lexicographically, we know that + // cur_span.0 <= span.0, which means that span starts inside + // of cur_span. + cur_span.1 = cur_span.1.max(span.1); + } else { + // Since span doesn't overlap cur_span, we know that no + // later span will overlap cur_span either. The size of + // cur_span is thus final. + mem::swap(cur_span, &mut span); + result.push(span); + } + } else { + cur_span = Some(span); + } + } + if let Some(cur_span) = cur_span { + result.push(cur_span); + } + + // Delete all spans for the room + tx.execute( + " + DELETE FROM euph_spans + WHERE room = ? + ", + [room], + )?; + + // Re-insert combined spans for the room + let mut stmt = tx.prepare( + " + INSERT INTO euph_spans (room, start, end) + VALUES (?, ?, ?) + ", + )?; + for (start, end) in result { + stmt.execute(params![room, start, end])?; + } + + Ok(()) + } + + fn add_msg( + conn: &Connection, + room: String, + msg: Message, + prev_msg: Option, + ) -> rusqlite::Result<()> { + todo!() + } + + fn add_msgs( + conn: &mut Connection, + room: String, + msgs: Vec, + next_msg_id: Option, + ) -> rusqlite::Result<()> { + let tx = conn.transaction()?; + + if msgs.is_empty() { + Self::add_span(&tx, &room, None, next_msg_id)?; + } else { + let first_msg_id = msgs.first().unwrap().id; + let last_msg_id = msgs.first().unwrap().id; + + let mut stmt = tx.prepare(" + INSERT INTO euph_msgs ( + room, id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated, + user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address + ) + VALUES ( + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ")?; + for msg in msgs { + stmt.execute(params![ + room, + msg.id, + msg.parent, + msg.previous_edit_id, + msg.time, + msg.content, + msg.encryption_key_id, + msg.edited, + msg.deleted, + msg.truncated, + msg.sender.id.0, + msg.sender.name, + msg.sender.server_id, + msg.sender.server_era, + msg.sender.session_id, + msg.sender.is_staff, + msg.sender.is_manager, + msg.sender.client_address, + msg.sender.real_client_address, + ])?; + } + + let last_msg_id = next_msg_id.unwrap_or(last_msg_id); + Self::add_span(&tx, &room, Some(first_msg_id), Some(last_msg_id))?; + } + + tx.commit()?; + Ok(()) + } + fn path( conn: &Connection, room: String, @@ -200,7 +409,7 @@ impl EuphRequest { ORDER BY id ASC ", )? - .query_map(params![room, id.0], |row| row.get(0).map(Snowflake))? + .query_map(params![room, id], |row| row.get(0))? .collect::>()?; let path = Path::new(path); let _ = result.send(path); @@ -231,10 +440,10 @@ impl EuphRequest { ORDER BY id ASC ", )? - .query_map(params![room, root.0], |row| { + .query_map(params![room, root], |row| { Ok(EuphMsg { id: Snowflake(row.get(0)?), - parent: row.get::<_, Option>(1)?.map(Snowflake), + parent: row.get(1)?, time: row.get(2)?, nick: row.get(3)?, content: row.get(4)?, @@ -263,7 +472,7 @@ impl EuphRequest { LIMIT 1 ", )? - .query_row(params![room, root.0], |row| row.get(0).map(Snowflake)) + .query_row(params![room, root], |row| row.get(0)) .optional()?; let _ = result.send(tree); Ok(()) @@ -286,7 +495,7 @@ impl EuphRequest { LIMIT 1 ", )? - .query_row(params![room, root.0], |row| row.get(0).map(Snowflake)) + .query_row(params![room, root], |row| row.get(0)) .optional()?; let _ = result.send(tree); Ok(()) @@ -307,7 +516,7 @@ impl EuphRequest { LIMIT 1 ", )? - .query_row([room], |row| row.get(0).map(Snowflake)) + .query_row([room], |row| row.get(0)) .optional()?; let _ = result.send(tree); Ok(()) @@ -328,7 +537,7 @@ impl EuphRequest { LIMIT 1 ", )? - .query_row([room], |row| row.get(0).map(Snowflake)) + .query_row([room], |row| row.get(0)) .optional()?; let _ = result.send(tree); Ok(())