Implement inserting euph messages into vault

This commit is contained in:
Joscha 2022-06-24 00:20:14 +02:00
parent 15933aac78
commit d6821881f3
3 changed files with 223 additions and 15 deletions

View file

@ -1,6 +1,5 @@
mod api;
pub mod api;
mod conn;
mod room;
pub use api::Snowflake;
pub use room::Room;

View file

@ -35,7 +35,7 @@ impl Vault {
}
}
fn run(conn: Connection, mut rx: mpsc::UnboundedReceiver<Request>) {
fn run(mut conn: Connection, mut rx: mpsc::UnboundedReceiver<Request>) {
while let Some(request) = rx.blocking_recv() {
match request {
Request::Close(tx) => {
@ -47,7 +47,7 @@ fn run(conn: Connection, mut rx: mpsc::UnboundedReceiver<Request>) {
drop(tx);
break;
}
Request::Euph(r) => r.perform(&conn),
Request::Euph(r) => r.perform(&mut conn),
}
}
}

View file

@ -1,13 +1,42 @@
use std::mem;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, OptionalExtension};
use chrono::{DateTime, TimeZone, Utc};
use rusqlite::types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef};
use rusqlite::{params, Connection, OptionalExtension, ToSql, Transaction};
use tokio::sync::{mpsc, oneshot};
use crate::euph::Snowflake;
use crate::euph::api::{Message, Snowflake, Time};
use crate::store::{Msg, MsgStore, Path, Tree};
use super::Request;
impl ToSql for Snowflake {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
self.0.to_sql()
}
}
impl FromSql for Snowflake {
fn column_result(value: ValueRef<'_>) -> Result<Self, FromSqlError> {
u64::column_result(value).map(Self)
}
}
impl ToSql for Time {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
let timestamp = self.0.timestamp();
Ok(ToSqlOutput::Owned(Value::Integer(timestamp)))
}
}
impl FromSql for Time {
fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let timestamp = i64::column_result(value)?;
Ok(Self(Utc.timestamp(timestamp, 0)))
}
}
#[derive(Debug, Clone)]
pub struct EuphMsg {
id: Snowflake,
@ -47,11 +76,32 @@ impl From<EuphRequest> for Request {
}
}
#[derive(Debug, Clone)]
pub struct EuphVault {
pub(super) tx: mpsc::UnboundedSender<Request>,
pub(super) room: String,
}
impl EuphVault {
pub fn add_message(&self, msg: Message, prev_msg: Option<Snowflake>) {
let request = EuphRequest::AddMsg {
room: self.room.clone(),
msg,
prev_msg,
};
let _ = self.tx.send(request.into());
}
pub fn add_messages(&self, msgs: Vec<Message>, next_msg: Option<Snowflake>) {
let request = EuphRequest::AddMsgs {
room: self.room.clone(),
msgs,
next_msg,
};
let _ = self.tx.send(request.into());
}
}
#[async_trait]
impl MsgStore<EuphMsg> for EuphVault {
async fn path(&self, id: &Snowflake) -> Path<Snowflake> {
@ -126,6 +176,16 @@ impl MsgStore<EuphMsg> for EuphVault {
}
pub(super) enum EuphRequest {
AddMsg {
room: String,
msg: Message,
prev_msg: Option<Snowflake>,
},
AddMsgs {
room: String,
msgs: Vec<Message>,
next_msg: Option<Snowflake>,
},
Path {
room: String,
id: Snowflake,
@ -157,8 +217,18 @@ pub(super) enum EuphRequest {
}
impl EuphRequest {
pub(super) fn perform(self, conn: &Connection) {
pub(super) fn perform(self, conn: &mut Connection) {
let result = match self {
EuphRequest::AddMsg {
room,
msg,
prev_msg,
} => Self::add_msg(conn, room, msg, prev_msg),
EuphRequest::AddMsgs {
room,
msgs,
next_msg,
} => Self::add_msgs(conn, room, msgs, next_msg),
EuphRequest::Path { room, id, result } => Self::path(conn, room, id, result),
EuphRequest::Tree { room, root, result } => Self::tree(conn, room, root, result),
EuphRequest::PrevTree { room, root, result } => {
@ -179,6 +249,145 @@ impl EuphRequest {
}
}
fn add_span(
tx: &Transaction,
room: &str,
first_msg_id: Option<Snowflake>,
last_msg_id: Option<Snowflake>,
) -> rusqlite::Result<()> {
// Retrieve all spans for the room
let mut spans = tx
.prepare(
"
SELECT start, end
FROM euph_spans
WHERE room = ?
",
)?
.query_map([room], |row| {
let start = row.get::<_, Option<Snowflake>>(0)?;
let end = row.get::<_, Option<Snowflake>>(1)?;
Ok((start, end))
})?
.collect::<Result<Vec<_>, _>>()?;
// Add new span and sort spans lexicographically
spans.push((first_msg_id, last_msg_id));
spans.sort_unstable();
// Combine overlapping spans (including newly added span)
let mut cur_span: Option<(Option<Snowflake>, Option<Snowflake>)> = None;
let mut result = vec![];
for mut span in spans {
if let Some(cur_span) = &mut cur_span {
if span.0 <= cur_span.1 {
// Since spans are sorted lexicographically, we know that
// cur_span.0 <= span.0, which means that span starts inside
// of cur_span.
cur_span.1 = cur_span.1.max(span.1);
} else {
// Since span doesn't overlap cur_span, we know that no
// later span will overlap cur_span either. The size of
// cur_span is thus final.
mem::swap(cur_span, &mut span);
result.push(span);
}
} else {
cur_span = Some(span);
}
}
if let Some(cur_span) = cur_span {
result.push(cur_span);
}
// Delete all spans for the room
tx.execute(
"
DELETE FROM euph_spans
WHERE room = ?
",
[room],
)?;
// Re-insert combined spans for the room
let mut stmt = tx.prepare(
"
INSERT INTO euph_spans (room, start, end)
VALUES (?, ?, ?)
",
)?;
for (start, end) in result {
stmt.execute(params![room, start, end])?;
}
Ok(())
}
fn add_msg(
conn: &Connection,
room: String,
msg: Message,
prev_msg: Option<Snowflake>,
) -> rusqlite::Result<()> {
todo!()
}
fn add_msgs(
conn: &mut Connection,
room: String,
msgs: Vec<Message>,
next_msg_id: Option<Snowflake>,
) -> rusqlite::Result<()> {
let tx = conn.transaction()?;
if msgs.is_empty() {
Self::add_span(&tx, &room, None, next_msg_id)?;
} else {
let first_msg_id = msgs.first().unwrap().id;
let last_msg_id = msgs.first().unwrap().id;
let mut stmt = tx.prepare("
INSERT INTO euph_msgs (
room, id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated,
user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address
)
VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?
)
")?;
for msg in msgs {
stmt.execute(params![
room,
msg.id,
msg.parent,
msg.previous_edit_id,
msg.time,
msg.content,
msg.encryption_key_id,
msg.edited,
msg.deleted,
msg.truncated,
msg.sender.id.0,
msg.sender.name,
msg.sender.server_id,
msg.sender.server_era,
msg.sender.session_id,
msg.sender.is_staff,
msg.sender.is_manager,
msg.sender.client_address,
msg.sender.real_client_address,
])?;
}
let last_msg_id = next_msg_id.unwrap_or(last_msg_id);
Self::add_span(&tx, &room, Some(first_msg_id), Some(last_msg_id))?;
}
tx.commit()?;
Ok(())
}
fn path(
conn: &Connection,
room: String,
@ -200,7 +409,7 @@ impl EuphRequest {
ORDER BY id ASC
",
)?
.query_map(params![room, id.0], |row| row.get(0).map(Snowflake))?
.query_map(params![room, id], |row| row.get(0))?
.collect::<rusqlite::Result<_>>()?;
let path = Path::new(path);
let _ = result.send(path);
@ -231,10 +440,10 @@ impl EuphRequest {
ORDER BY id ASC
",
)?
.query_map(params![room, root.0], |row| {
.query_map(params![room, root], |row| {
Ok(EuphMsg {
id: Snowflake(row.get(0)?),
parent: row.get::<_, Option<u64>>(1)?.map(Snowflake),
parent: row.get(1)?,
time: row.get(2)?,
nick: row.get(3)?,
content: row.get(4)?,
@ -263,7 +472,7 @@ impl EuphRequest {
LIMIT 1
",
)?
.query_row(params![room, root.0], |row| row.get(0).map(Snowflake))
.query_row(params![room, root], |row| row.get(0))
.optional()?;
let _ = result.send(tree);
Ok(())
@ -286,7 +495,7 @@ impl EuphRequest {
LIMIT 1
",
)?
.query_row(params![room, root.0], |row| row.get(0).map(Snowflake))
.query_row(params![room, root], |row| row.get(0))
.optional()?;
let _ = result.send(tree);
Ok(())
@ -307,7 +516,7 @@ impl EuphRequest {
LIMIT 1
",
)?
.query_row([room], |row| row.get(0).map(Snowflake))
.query_row([room], |row| row.get(0))
.optional()?;
let _ = result.send(tree);
Ok(())
@ -328,7 +537,7 @@ impl EuphRequest {
LIMIT 1
",
)?
.query_row([room], |row| row.get(0).map(Snowflake))
.query_row([room], |row| row.get(0))
.optional()?;
let _ = result.send(tree);
Ok(())