diff --git a/src/main.rs b/src/main.rs index 1e5beb7..726a0ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use clap::Parser; use directories::ProjectDirs; use state::AppState; use tokio::{select, signal::unix::SignalKind}; -use tracing::{error, info, Level}; +use tracing::{debug, error, info, Level}; use tracing_subscriber::{ filter::Targets, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, }; @@ -78,6 +78,8 @@ fn load_config(path: Option) -> somehow::Result<&'static Config> { } async fn wait_for_signal() -> io::Result<()> { + debug!("Listening to signals"); + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?; @@ -91,6 +93,8 @@ async fn wait_for_signal() -> io::Result<()> { } async fn die_on_signal() -> io::Result<()> { + debug!("Listening to signals again"); + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?; @@ -121,6 +125,15 @@ async fn run() -> somehow::Result<()> { select! { _ = die_on_signal() => {} + // For some reason, the thread pool shutting down seems to block + // receiving further signals if a heavy sql operation is currently + // running. Maybe this is due to the thread pool not deferring blocking + // work to a separate thread? In any case, replacing it with a sleep + // doesn't block the signals. + // + // In order to fix this, I could maybe register a bare signal handler + // (instead of using tokio streams) that just calls process::exit(1) and + // nothing else? _ = state.shut_down() => {} } diff --git a/src/recurring.rs b/src/recurring.rs index 78109a0..c94246d 100644 --- a/src/recurring.rs +++ b/src/recurring.rs @@ -10,10 +10,8 @@ use tracing::{debug_span, error, Instrument}; use crate::state::AppState; async fn recurring_task(state: &AppState) { - let repo = state.repo.to_thread_local(); - async { - if let Err(e) = repo::update(&state.db, &repo).await { + if let Err(e) = repo::update(&state.db, state.repo.clone()).await { error!("Error updating repo:\n{e:?}"); }; } diff --git a/src/recurring/repo.rs b/src/recurring/repo.rs index ae2a7a3..5d75ac7 100644 --- a/src/recurring/repo.rs +++ b/src/recurring/repo.rs @@ -1,13 +1,11 @@ //! Add new commits to the database and update the tracked refs. -// TODO Prevent some sync stuff from blocking the async stuff - -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; use futures::TryStreamExt; use gix::{ - actor::IdentityRef, date::time::format::ISO8601_STRICT, objs::Kind, refs::Reference, Commit, - ObjectId, Repository, + actor::IdentityRef, date::time::format::ISO8601_STRICT, objs::Kind, prelude::ObjectIdExt, + refs::Reference, ObjectId, Repository, ThreadSafeRepository, }; use sqlx::{Acquire, SqliteConnection, SqlitePool}; use tracing::{debug, info}; @@ -44,23 +42,31 @@ fn get_all_refs_from_repo(repo: &Repository) -> somehow::Result> Ok(references) } -fn get_new_commits_from_repo<'a, 'b: 'a>( - repo: &'a Repository, +fn get_new_commits_from_repo( + repo: &Repository, refs: &[Reference], - old: &'b HashSet, -) -> somehow::Result>> { + old: &HashSet, +) -> somehow::Result> { let ref_ids = refs.iter().flat_map(|r| r.peeled.into_iter()); // Walk from those until hitting old references let mut new = vec![]; for commit in repo.rev_walk(ref_ids).selected(|c| !old.contains(c))? { - let commit = commit?.id().object()?.try_into_commit()?; - new.push(commit); + new.push(commit?.id); } Ok(new) } +fn get_all_refs_and_new_commits_from_repo( + repo: &Repository, + old: &HashSet, +) -> somehow::Result<(Vec, Vec)> { + let refs = get_all_refs_from_repo(repo)?; + let new = get_new_commits_from_repo(repo, &refs, old)?; + Ok((refs, new)) +} + pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result { let mut buffer = vec![]; author.trim().write_to(&mut buffer)?; @@ -69,9 +75,11 @@ pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result { async fn insert_new_commits( conn: &mut SqliteConnection, - new: &[Commit<'_>], + repo: &Repository, + new: &[ObjectId], ) -> somehow::Result<()> { - for (i, commit) in new.iter().enumerate() { + for (i, id) in new.iter().enumerate() { + let commit = id.attach(repo).object()?.try_into_commit()?; let hash = commit.id.to_string(); let author_info = commit.author()?; let author = format_actor(author_info.actor())?; @@ -113,9 +121,11 @@ async fn insert_new_commits( async fn insert_new_commit_links( conn: &mut SqliteConnection, - new: &[Commit<'_>], + repo: &Repository, + new: &[ObjectId], ) -> somehow::Result<()> { - for (i, commit) in new.iter().enumerate() { + for (i, hash) in new.iter().enumerate() { + let commit = hash.attach(repo).object()?.try_into_commit()?; let child = commit.id.to_string(); for parent in commit.parent_ids() { let parent = parent.to_string(); @@ -224,8 +234,9 @@ async fn update_commit_tracked_status(conn: &mut SqliteConnection) -> somehow::R Ok(()) } -pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> { +pub async fn update(db: &SqlitePool, repo: Arc) -> somehow::Result<()> { debug!("Updating repo"); + let thread_local_repo = repo.to_thread_local(); let mut tx = db.begin().await?; let conn = tx.acquire().await?; @@ -237,8 +248,12 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> { info!("Initializing new repo"); } - let refs = get_all_refs_from_repo(repo)?; - let new = get_new_commits_from_repo(repo, &refs, &old)?; + // This can take a while for larger repos. Running it via spawn_blocking + // keeps it from blocking the entire tokio worker. + let (refs, new) = tokio::task::spawn_blocking(move || { + get_all_refs_and_new_commits_from_repo(&repo.to_thread_local(), &old) + }) + .await??; debug!("Found {} new commits in repo", new.len()); // Defer foreign key checks until the end of the transaction to improve @@ -250,15 +265,15 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> { // Inserts are grouped by table so sqlite can process them *a lot* faster // than if they were grouped by commit (insert commit and parents, then next // commit and so on). - insert_new_commits(conn, &new).await?; - insert_new_commit_links(conn, &new).await?; + insert_new_commits(conn, &thread_local_repo, &new).await?; + insert_new_commit_links(conn, &thread_local_repo, &new).await?; if repo_is_new { mark_all_commits_as_old(conn).await?; } update_refs(conn, refs).await?; if repo_is_new { - track_main_branch(conn, repo).await?; + track_main_branch(conn, &thread_local_repo).await?; } update_commit_tracked_status(conn).await?; debug!("Updated tracked refs");