From b56d0df1429b575d27f0d6b2fe6ce2a3da469691 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 5 Aug 2023 11:37:43 +0200 Subject: [PATCH] Refactor repo update code again --- src/main.rs | 4 +- src/recurring.rs | 29 +++++++ src/{update.rs => recurring/repo.rs} | 118 +++++++++++---------------- 3 files changed, 77 insertions(+), 74 deletions(-) create mode 100644 src/recurring.rs rename src/{update.rs => recurring/repo.rs} (76%) diff --git a/src/main.rs b/src/main.rs index e5fab2d..80a31a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ mod config; +mod recurring; mod state; mod r#static; -mod update; use std::{io, path::PathBuf}; @@ -132,7 +132,7 @@ async fn run() -> anyhow::Result<()> { select! { _ = wait_for_signal() => {}, _ = server.serve(app) => {}, - _ = update::repeatedly(state.clone()) => {}, + _ = recurring::run(state.clone()) => {}, } state.shut_down().await; diff --git a/src/recurring.rs b/src/recurring.rs new file mode 100644 index 0000000..78109a0 --- /dev/null +++ b/src/recurring.rs @@ -0,0 +1,29 @@ +//! Recurring actions and updates. + +// TODO `fetch` submodule for fetching new commits +// TODO `queue` submodule for updating the queue + +mod repo; + +use tracing::{debug_span, error, Instrument}; + +use crate::state::AppState; + +async fn recurring_task(state: &AppState) { + let repo = state.repo.to_thread_local(); + + async { + if let Err(e) = repo::update(&state.db, &repo).await { + error!("Error updating repo:\n{e:?}"); + }; + } + .instrument(debug_span!("update repo")) + .await; +} + +pub async fn run(state: AppState) { + loop { + recurring_task(&state).await; + tokio::time::sleep(state.config.repo.update_delay).await; + } +} diff --git a/src/update.rs b/src/recurring/repo.rs similarity index 76% rename from src/update.rs rename to src/recurring/repo.rs index ebcd98c..5e29d3d 100644 --- a/src/update.rs +++ b/src/recurring/repo.rs @@ -1,60 +1,13 @@ -//! Repeatedly update the db from the repo. +//! Add new commits to the database and update the tracked refs. + +// TODO Think about whether ref hashes should be tracked in the db use std::collections::HashSet; -use anyhow::anyhow; use futures::TryStreamExt; use gix::{objs::Kind, traverse::commit::Info, ObjectId, Repository}; -use sqlx::{prelude::*, SqliteConnection, SqlitePool}; -use tracing::{debug, debug_span, error, info, Instrument}; - -use crate::state::AppState; - -/// Add new commits from the repo to the database, marked as new. -// TODO Rename so the name fits better (maybe rename update module to recurring as well?) -// upadte module -> recurring -// Fetching new commits -> fetch submodule -// Updating repo (current file) -> repo submodule -// Updating queue -> queue submodule -async fn add_new_commits_to_db(db: &SqlitePool, repo: &Repository) -> anyhow::Result<()> { - debug!("Adding new commits to db"); - let mut tx = db.begin().await?; - let conn = tx.acquire().await?; - - let old = get_all_commits_from_db(&mut *conn).await?; - debug!("Loaded {} commits from the db", old.len()); - - let new = get_new_commits_from_repo(repo, &old)?; - debug!("Found {} new commits in repo", new.len()); - - // Defer foreign key checks until the end of the transaction to improve - // insert performance. - sqlx::query!("PRAGMA defer_foreign_keys=1") - .execute(&mut *conn) - .await?; - - // Inserts are grouped by table so sqlite can process them *a lot* faster - // than if they were grouped by commit (insert commit and parents, then next - // commit and so on). - insert_new_commits(conn, &new).await?; - insert_new_commit_links(conn, &new).await?; - debug!("Inserted {} new commits into db", new.len()); - - if old.is_empty() { - // We've never seen any repo before, so we need to do additional - // initialization. - info!("Detected new repo, initializing"); - mark_all_commits_as_old(conn).await?; - track_main_branch(conn, repo).await?; - info!("Initialized new repo"); - } - - update_tracked_refs(conn, repo).await?; - - tx.commit().await?; - debug!("Finished adding new commits to db"); - Ok(()) -} +use sqlx::{Acquire, SqliteConnection, SqlitePool}; +use tracing::{debug, info}; async fn get_all_commits_from_db(conn: &mut SqliteConnection) -> anyhow::Result> { let hashes = sqlx::query!("SELECT hash FROM commits") @@ -74,7 +27,7 @@ fn get_new_commits_from_repo( // Collect all references starting with "refs" let mut all_references: Vec = vec![]; for reference in repo.references()?.prefixed("refs")? { - let reference = reference.map_err(|e| anyhow!(e))?; + let reference = reference.map_err(|e| anyhow::anyhow!(e))?; let id = reference.into_fully_peeled_id()?; // Some repos *cough*linuxkernel*cough* have refs that don't point to @@ -174,27 +127,48 @@ async fn update_tracked_refs(conn: &mut SqliteConnection, repo: &Repository) -> Ok(()) } -async fn update_repo(state: &AppState) -> anyhow::Result<()> { - info!("Updating repo"); - let repo = state.repo.to_thread_local(); +pub async fn update(db: &SqlitePool, repo: &Repository) -> anyhow::Result<()> { + debug!("Updating repo"); + let mut tx = db.begin().await?; + let conn = tx.acquire().await?; - add_new_commits_to_db(&state.db, &repo) - .instrument(debug_span!("add new commits")) + let old = get_all_commits_from_db(&mut *conn).await?; + debug!("Loaded {} commits from the db", old.len()); + + let repo_is_new = old.is_empty(); + + if repo_is_new { + info!("Initializing new repo"); + } + + let new = get_new_commits_from_repo(repo, &old)?; + debug!("Found {} new commits in repo", new.len()); + + // Defer foreign key checks until the end of the transaction to improve + // insert performance. + sqlx::query!("PRAGMA defer_foreign_keys=1") + .execute(&mut *conn) .await?; + // Inserts are grouped by table so sqlite can process them *a lot* faster + // than if they were grouped by commit (insert commit and parents, then next + // commit and so on). + insert_new_commits(conn, &new).await?; + insert_new_commit_links(conn, &new).await?; + debug!("Inserted {} new commits into db", new.len()); + + if repo_is_new { + mark_all_commits_as_old(conn).await?; + track_main_branch(conn, repo).await?; + debug!("Prepared new repo"); + } + + update_tracked_refs(conn, repo).await?; + + tx.commit().await?; + if repo_is_new { + info!("Initialized new repo"); + } + debug!("Updated repo"); Ok(()) } - -pub async fn repeatedly(state: AppState) { - loop { - async { - if let Err(e) = update_repo(&state).await { - error!("Error updating repo:\n{e:?}"); - } - } - .instrument(debug_span!("update repo")) - .await; - - tokio::time::sleep(state.config.repo.update_delay).await; - } -}