diff --git a/src/update.rs b/src/update.rs index ab11ff9..d01be80 100644 --- a/src/update.rs +++ b/src/update.rs @@ -4,76 +4,41 @@ use std::collections::HashSet; use anyhow::anyhow; use futures::TryStreamExt; -use gix::{ObjectId, Repository}; +use gix::{traverse::commit::Info, ObjectId, Repository}; use sqlx::{prelude::*, SqliteConnection, SqlitePool}; -use tracing::{debug, debug_span, error, Instrument}; +use tracing::{debug, debug_span, error, info, Instrument}; use crate::state::AppState; /// Add new commits from the repo to the database, marked as new. -/// -/// Starts at the known refs and advances depth-first until it hits a commit -/// that is already in the db. -/// -/// Uses a transaction because batch inserts in sqlite are a lot faster in -/// transactions. // TODO Initialize tracked refs? // TODO Update tracked refs? async fn add_new_commits_to_db(db: &SqlitePool, repo: &Repository) -> anyhow::Result<()> { - debug!("Adding new commits to the db"); + debug!("Adding new commits to db"); let mut tx = db.begin().await?; let conn = tx.acquire().await?; + let old = get_all_commits_from_db(&mut *conn).await?; + debug!("Loaded {} commits from the db", old.len()); + + let new = get_new_commits_from_repo(repo, &old)?; + debug!("Found {} new commits in repo", new.len()); + // Defer foreign key checks until the end of the transaction to improve // insert performance. sqlx::query!("PRAGMA defer_foreign_keys=1") .execute(&mut *conn) .await?; - let commits = get_all_commits_from_db(&mut *conn).await?; - debug!("Loaded {} commits from the db", commits.len()); + // Inserts are grouped by table so sqlite can process them *a lot* faster + // than if they were grouped by commit (insert commit and parents, then next + // commit and so on). + insert_new_commits(conn, &new).await?; + insert_new_commit_links(conn, &new).await?; + debug!("Inserted {} new commits into db", new.len()); - let mut references = vec![]; - for reference in repo.references()?.prefixed("refs")? { - let id: ObjectId = reference - .map_err(|e| anyhow!(e))? - .into_fully_peeled_id()? - .into(); - references.push(id); - } - debug!("Found {} refs in repo", references.len()); - - let new_commits = repo - .rev_walk(references) - .selected(|c| !commits.contains(c))? - .collect::, _>>()?; - debug!("Found {} new commits in repo", new_commits.len()); - - for commit in &new_commits { - let hash = commit.id.to_string(); - sqlx::query!("INSERT OR IGNORE INTO commits (hash) VALUES (?)", hash) - .execute(&mut *conn) - .await?; - } - debug!("Inserted commits"); - - for commit in &new_commits { - let hash = commit.id.to_string(); - for parent in commit.parent_ids() { - let parent_hash = parent.to_string(); - sqlx::query!( - "INSERT INTO commit_links (parent, child) VALUES (?, ?)", - parent_hash, - hash - ) - .execute(&mut *conn) - .await?; - } - } - debug!("Inserted commit links"); - - debug!("Finished adding new commits to the db"); tx.commit().await?; + debug!("Finished adding new commits to db"); Ok(()) } @@ -88,7 +53,59 @@ async fn get_all_commits_from_db(conn: &mut SqliteConnection) -> anyhow::Result< Ok(hashes) } +fn get_new_commits_from_repo( + repo: &Repository, + old: &HashSet, +) -> anyhow::Result> { + // Collect all references starting with "refs" + let mut all_references = vec![]; + for reference in repo.references()?.prefixed("refs")? { + let id: ObjectId = reference + .map_err(|e| anyhow!(e))? + .into_fully_peeled_id()? + .into(); + all_references.push(id); + } + + // Walk from those until hitting old references + let new_commits = repo + .rev_walk(all_references) + .selected(|c| !old.contains(c))? + .map(|r| r.map(|i| i.detach())) + .collect::, _>>()?; + + Ok(new_commits) +} + +async fn insert_new_commits(conn: &mut SqliteConnection, new: &[Info]) -> anyhow::Result<()> { + for commit in new { + let hash = commit.id.to_string(); + sqlx::query!("INSERT OR IGNORE INTO commits (hash) VALUES (?)", hash) + .execute(&mut *conn) + .await?; + } + Ok(()) +} + +async fn insert_new_commit_links(conn: &mut SqliteConnection, new: &[Info]) -> anyhow::Result<()> { + for commit in new { + let child = commit.id.to_string(); + for parent in &commit.parent_ids { + let parent = parent.to_string(); + sqlx::query!( + "INSERT INTO commit_links (parent, child) VALUES (?, ?)", + parent, + child + ) + .execute(&mut *conn) + .await?; + } + } + Ok(()) +} + async fn update_repo(state: &AppState) -> anyhow::Result<()> { + info!("Updating repo"); let repo = state.repo.to_thread_local(); add_new_commits_to_db(&state.db, &repo) @@ -102,7 +119,7 @@ pub async fn repeatedly(state: AppState) { loop { async { if let Err(e) = update_repo(&state).await { - error!("{e:?}"); + error!("Error updating repo:\n{e:?}"); } } .instrument(debug_span!("update repo"))