Don't block tokio when collecting new commits
This commit is contained in:
parent
5709a69694
commit
6fcd073738
3 changed files with 51 additions and 25 deletions
15
src/main.rs
15
src/main.rs
|
|
@ -11,7 +11,7 @@ use clap::Parser;
|
||||||
use directories::ProjectDirs;
|
use directories::ProjectDirs;
|
||||||
use state::AppState;
|
use state::AppState;
|
||||||
use tokio::{select, signal::unix::SignalKind};
|
use tokio::{select, signal::unix::SignalKind};
|
||||||
use tracing::{error, info, Level};
|
use tracing::{debug, error, info, Level};
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{
|
||||||
filter::Targets, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
|
filter::Targets, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt,
|
||||||
};
|
};
|
||||||
|
|
@ -78,6 +78,8 @@ fn load_config(path: Option<PathBuf>) -> somehow::Result<&'static Config> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_signal() -> io::Result<()> {
|
async fn wait_for_signal() -> io::Result<()> {
|
||||||
|
debug!("Listening to signals");
|
||||||
|
|
||||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||||
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
|
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||||
|
|
||||||
|
|
@ -91,6 +93,8 @@ async fn wait_for_signal() -> io::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn die_on_signal() -> io::Result<()> {
|
async fn die_on_signal() -> io::Result<()> {
|
||||||
|
debug!("Listening to signals again");
|
||||||
|
|
||||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||||
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
|
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||||
|
|
||||||
|
|
@ -121,6 +125,15 @@ async fn run() -> somehow::Result<()> {
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = die_on_signal() => {}
|
_ = die_on_signal() => {}
|
||||||
|
// For some reason, the thread pool shutting down seems to block
|
||||||
|
// receiving further signals if a heavy sql operation is currently
|
||||||
|
// running. Maybe this is due to the thread pool not deferring blocking
|
||||||
|
// work to a separate thread? In any case, replacing it with a sleep
|
||||||
|
// doesn't block the signals.
|
||||||
|
//
|
||||||
|
// In order to fix this, I could maybe register a bare signal handler
|
||||||
|
// (instead of using tokio streams) that just calls process::exit(1) and
|
||||||
|
// nothing else?
|
||||||
_ = state.shut_down() => {}
|
_ = state.shut_down() => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,8 @@ use tracing::{debug_span, error, Instrument};
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
|
|
||||||
async fn recurring_task(state: &AppState) {
|
async fn recurring_task(state: &AppState) {
|
||||||
let repo = state.repo.to_thread_local();
|
|
||||||
|
|
||||||
async {
|
async {
|
||||||
if let Err(e) = repo::update(&state.db, &repo).await {
|
if let Err(e) = repo::update(&state.db, state.repo.clone()).await {
|
||||||
error!("Error updating repo:\n{e:?}");
|
error!("Error updating repo:\n{e:?}");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,11 @@
|
||||||
//! Add new commits to the database and update the tracked refs.
|
//! Add new commits to the database and update the tracked refs.
|
||||||
|
|
||||||
// TODO Prevent some sync stuff from blocking the async stuff
|
use std::{collections::HashSet, sync::Arc};
|
||||||
|
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use gix::{
|
use gix::{
|
||||||
actor::IdentityRef, date::time::format::ISO8601_STRICT, objs::Kind, refs::Reference, Commit,
|
actor::IdentityRef, date::time::format::ISO8601_STRICT, objs::Kind, prelude::ObjectIdExt,
|
||||||
ObjectId, Repository,
|
refs::Reference, ObjectId, Repository, ThreadSafeRepository,
|
||||||
};
|
};
|
||||||
use sqlx::{Acquire, SqliteConnection, SqlitePool};
|
use sqlx::{Acquire, SqliteConnection, SqlitePool};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
@ -44,23 +42,31 @@ fn get_all_refs_from_repo(repo: &Repository) -> somehow::Result<Vec<Reference>>
|
||||||
Ok(references)
|
Ok(references)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_new_commits_from_repo<'a, 'b: 'a>(
|
fn get_new_commits_from_repo(
|
||||||
repo: &'a Repository,
|
repo: &Repository,
|
||||||
refs: &[Reference],
|
refs: &[Reference],
|
||||||
old: &'b HashSet<ObjectId>,
|
old: &HashSet<ObjectId>,
|
||||||
) -> somehow::Result<Vec<Commit<'a>>> {
|
) -> somehow::Result<Vec<ObjectId>> {
|
||||||
let ref_ids = refs.iter().flat_map(|r| r.peeled.into_iter());
|
let ref_ids = refs.iter().flat_map(|r| r.peeled.into_iter());
|
||||||
|
|
||||||
// Walk from those until hitting old references
|
// Walk from those until hitting old references
|
||||||
let mut new = vec![];
|
let mut new = vec![];
|
||||||
for commit in repo.rev_walk(ref_ids).selected(|c| !old.contains(c))? {
|
for commit in repo.rev_walk(ref_ids).selected(|c| !old.contains(c))? {
|
||||||
let commit = commit?.id().object()?.try_into_commit()?;
|
new.push(commit?.id);
|
||||||
new.push(commit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(new)
|
Ok(new)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_all_refs_and_new_commits_from_repo(
|
||||||
|
repo: &Repository,
|
||||||
|
old: &HashSet<ObjectId>,
|
||||||
|
) -> somehow::Result<(Vec<Reference>, Vec<ObjectId>)> {
|
||||||
|
let refs = get_all_refs_from_repo(repo)?;
|
||||||
|
let new = get_new_commits_from_repo(repo, &refs, old)?;
|
||||||
|
Ok((refs, new))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result<String> {
|
pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result<String> {
|
||||||
let mut buffer = vec![];
|
let mut buffer = vec![];
|
||||||
author.trim().write_to(&mut buffer)?;
|
author.trim().write_to(&mut buffer)?;
|
||||||
|
|
@ -69,9 +75,11 @@ pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result<String> {
|
||||||
|
|
||||||
async fn insert_new_commits(
|
async fn insert_new_commits(
|
||||||
conn: &mut SqliteConnection,
|
conn: &mut SqliteConnection,
|
||||||
new: &[Commit<'_>],
|
repo: &Repository,
|
||||||
|
new: &[ObjectId],
|
||||||
) -> somehow::Result<()> {
|
) -> somehow::Result<()> {
|
||||||
for (i, commit) in new.iter().enumerate() {
|
for (i, id) in new.iter().enumerate() {
|
||||||
|
let commit = id.attach(repo).object()?.try_into_commit()?;
|
||||||
let hash = commit.id.to_string();
|
let hash = commit.id.to_string();
|
||||||
let author_info = commit.author()?;
|
let author_info = commit.author()?;
|
||||||
let author = format_actor(author_info.actor())?;
|
let author = format_actor(author_info.actor())?;
|
||||||
|
|
@ -113,9 +121,11 @@ async fn insert_new_commits(
|
||||||
|
|
||||||
async fn insert_new_commit_links(
|
async fn insert_new_commit_links(
|
||||||
conn: &mut SqliteConnection,
|
conn: &mut SqliteConnection,
|
||||||
new: &[Commit<'_>],
|
repo: &Repository,
|
||||||
|
new: &[ObjectId],
|
||||||
) -> somehow::Result<()> {
|
) -> somehow::Result<()> {
|
||||||
for (i, commit) in new.iter().enumerate() {
|
for (i, hash) in new.iter().enumerate() {
|
||||||
|
let commit = hash.attach(repo).object()?.try_into_commit()?;
|
||||||
let child = commit.id.to_string();
|
let child = commit.id.to_string();
|
||||||
for parent in commit.parent_ids() {
|
for parent in commit.parent_ids() {
|
||||||
let parent = parent.to_string();
|
let parent = parent.to_string();
|
||||||
|
|
@ -224,8 +234,9 @@ async fn update_commit_tracked_status(conn: &mut SqliteConnection) -> somehow::R
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> {
|
pub async fn update(db: &SqlitePool, repo: Arc<ThreadSafeRepository>) -> somehow::Result<()> {
|
||||||
debug!("Updating repo");
|
debug!("Updating repo");
|
||||||
|
let thread_local_repo = repo.to_thread_local();
|
||||||
let mut tx = db.begin().await?;
|
let mut tx = db.begin().await?;
|
||||||
let conn = tx.acquire().await?;
|
let conn = tx.acquire().await?;
|
||||||
|
|
||||||
|
|
@ -237,8 +248,12 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> {
|
||||||
info!("Initializing new repo");
|
info!("Initializing new repo");
|
||||||
}
|
}
|
||||||
|
|
||||||
let refs = get_all_refs_from_repo(repo)?;
|
// This can take a while for larger repos. Running it via spawn_blocking
|
||||||
let new = get_new_commits_from_repo(repo, &refs, &old)?;
|
// keeps it from blocking the entire tokio worker.
|
||||||
|
let (refs, new) = tokio::task::spawn_blocking(move || {
|
||||||
|
get_all_refs_and_new_commits_from_repo(&repo.to_thread_local(), &old)
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
debug!("Found {} new commits in repo", new.len());
|
debug!("Found {} new commits in repo", new.len());
|
||||||
|
|
||||||
// Defer foreign key checks until the end of the transaction to improve
|
// Defer foreign key checks until the end of the transaction to improve
|
||||||
|
|
@ -250,15 +265,15 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> {
|
||||||
// Inserts are grouped by table so sqlite can process them *a lot* faster
|
// Inserts are grouped by table so sqlite can process them *a lot* faster
|
||||||
// than if they were grouped by commit (insert commit and parents, then next
|
// than if they were grouped by commit (insert commit and parents, then next
|
||||||
// commit and so on).
|
// commit and so on).
|
||||||
insert_new_commits(conn, &new).await?;
|
insert_new_commits(conn, &thread_local_repo, &new).await?;
|
||||||
insert_new_commit_links(conn, &new).await?;
|
insert_new_commit_links(conn, &thread_local_repo, &new).await?;
|
||||||
if repo_is_new {
|
if repo_is_new {
|
||||||
mark_all_commits_as_old(conn).await?;
|
mark_all_commits_as_old(conn).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
update_refs(conn, refs).await?;
|
update_refs(conn, refs).await?;
|
||||||
if repo_is_new {
|
if repo_is_new {
|
||||||
track_main_branch(conn, repo).await?;
|
track_main_branch(conn, &thread_local_repo).await?;
|
||||||
}
|
}
|
||||||
update_commit_tracked_status(conn).await?;
|
update_commit_tracked_status(conn).await?;
|
||||||
debug!("Updated tracked refs");
|
debug!("Updated tracked refs");
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue