Make JSON exports faster

This commit is contained in:
Joscha 2023-04-05 01:10:49 +02:00
parent 9f9c3d998e
commit 847af34ceb
3 changed files with 69 additions and 55 deletions

View file

@ -8,14 +8,13 @@ pub async fn export<W: Write>(vault: &EuphRoomVault, file: &mut W) -> anyhow::Re
write!(file, "[")?;
let mut total = 0;
let mut offset = 0;
let mut last_msg_id = None;
loop {
let messages = vault.chunk_at_offset(CHUNK_SIZE, offset).await?;
offset += messages.len();
if messages.is_empty() {
break;
}
let messages = vault.chunk_after(last_msg_id, CHUNK_SIZE).await?;
last_msg_id = Some(match messages.last() {
Some(last_msg) => last_msg.id,
None => break, // No more messages, export finished
});
for message in messages {
if total == 0 {
@ -40,14 +39,13 @@ pub async fn export<W: Write>(vault: &EuphRoomVault, file: &mut W) -> anyhow::Re
pub async fn export_stream<W: Write>(vault: &EuphRoomVault, file: &mut W) -> anyhow::Result<()> {
let mut total = 0;
let mut offset = 0;
let mut last_msg_id = None;
loop {
let messages = vault.chunk_at_offset(CHUNK_SIZE, offset).await?;
offset += messages.len();
if messages.is_empty() {
break;
}
let messages = vault.chunk_after(last_msg_id, CHUNK_SIZE).await?;
last_msg_id = Some(match messages.last() {
Some(last_msg) => last_msg.id,
None => break, // No more messages, export finished
});
for message in messages {
serde_json::to_writer(&mut *file, &message)?; // Fancy reborrow! :D

View file

@ -5,7 +5,7 @@ use async_trait::async_trait;
use cookie::{Cookie, CookieJar};
use euphoxide::api::{Message, MessageId, SessionId, SessionView, Snowflake, Time, UserId};
use rusqlite::types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef};
use rusqlite::{named_params, params, Connection, OptionalExtension, ToSql, Transaction};
use rusqlite::{named_params, params, Connection, OptionalExtension, Row, ToSql, Transaction};
use time::OffsetDateTime;
use vault::Action;
@ -240,7 +240,7 @@ euph_room_vault_actions! {
GetUnseenMsgsCount : unseen_msgs_count() -> usize;
SetSeen : set_seen(id: MessageId, seen: bool) -> ();
SetOlderSeen : set_older_seen(id: MessageId, seen: bool) -> ();
GetChunkAtOffset : chunk_at_offset(amount: usize, offset: usize) -> Vec<Message>;
GetChunkAfter : chunk_after(id: Option<MessageId>, amount: usize) -> Vec<Message>;
}
impl Action for Join {
@ -961,49 +961,62 @@ impl Action for SetOlderSeen {
}
}
impl Action for GetChunkAtOffset {
impl Action for GetChunkAfter {
type Result = Vec<Message>;
fn run(self, conn: &mut Connection) -> rusqlite::Result<Self::Result> {
let mut query = conn.prepare(
"
SELECT
id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated,
user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address
FROM euph_msgs
WHERE room = ?
ORDER BY id ASC
LIMIT ?
OFFSET ?
",
)?;
fn row2msg(row: &Row<'_>) -> rusqlite::Result<Message> {
Ok(Message {
id: MessageId(row.get::<_, WSnowflake>(0)?.0),
parent: row.get::<_, Option<WSnowflake>>(1)?.map(|s| MessageId(s.0)),
previous_edit_id: row.get::<_, Option<WSnowflake>>(2)?.map(|s| s.0),
time: row.get::<_, WTime>(3)?.0,
content: row.get(4)?,
encryption_key_id: row.get(5)?,
edited: row.get::<_, Option<WTime>>(6)?.map(|t| t.0),
deleted: row.get::<_, Option<WTime>>(7)?.map(|t| t.0),
truncated: row.get(8)?,
sender: SessionView {
id: UserId(row.get(9)?),
name: row.get(10)?,
server_id: row.get(11)?,
server_era: row.get(12)?,
session_id: SessionId(row.get(13)?),
is_staff: row.get(14)?,
is_manager: row.get(15)?,
client_address: row.get(16)?,
real_client_address: row.get(17)?,
},
})
}
let messages = if let Some(id) = self.id {
conn.prepare("
SELECT
id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated,
user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address
FROM euph_msgs
WHERE room = ?
AND id > ?
ORDER BY id ASC
LIMIT ?
")?
.query_map(params![self.room, WSnowflake(id.0), self.amount], row2msg)?
.collect::<rusqlite::Result<_>>()?
} else {
conn.prepare("
SELECT
id, parent, previous_edit_id, time, content, encryption_key_id, edited, deleted, truncated,
user_id, name, server_id, server_era, session_id, is_staff, is_manager, client_address, real_client_address
FROM euph_msgs
WHERE room = ?
ORDER BY id ASC
LIMIT ?
")?
.query_map(params![self.room, self.amount], row2msg)?
.collect::<rusqlite::Result<_>>()?
};
let messages = query
.query_map(params![self.room, self.amount, self.offset], |row| {
Ok(Message {
id: MessageId(row.get::<_, WSnowflake>(0)?.0),
parent: row.get::<_, Option<WSnowflake>>(1)?.map(|s| MessageId(s.0)),
previous_edit_id: row.get::<_, Option<WSnowflake>>(2)?.map(|s| s.0),
time: row.get::<_, WTime>(3)?.0,
content: row.get(4)?,
encryption_key_id: row.get(5)?,
edited: row.get::<_, Option<WTime>>(6)?.map(|t| t.0),
deleted: row.get::<_, Option<WTime>>(7)?.map(|t| t.0),
truncated: row.get(8)?,
sender: SessionView {
id: UserId(row.get(9)?),
name: row.get(10)?,
server_id: row.get(11)?,
server_era: row.get(12)?,
session_id: SessionId(row.get(13)?),
is_staff: row.get(14)?,
is_manager: row.get(15)?,
client_address: row.get(16)?,
real_client_address: row.get(17)?,
},
})
})?
.collect::<rusqlite::Result<_>>()?;
Ok(messages)
}
}