Store all refs in the db

This commit is contained in:
Joscha 2023-08-06 12:47:51 +02:00
parent 7768e4ad4b
commit 21d97a5bf4
15 changed files with 186 additions and 141 deletions

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE refs SET tracked = true WHERE name = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "09e16f417f6b7626d7350f4fdb6b3061f3c63535b20a1108186c0c652297b1bd"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "WITH RECURSIVE tracked (hash) AS ( SELECT hash FROM refs WHERE tracked UNION SELECT parent FROM commit_links JOIN tracked ON hash = child ), reachable (hash) AS ( SELECT hash FROM refs UNION SELECT hash FROM tracked UNION SELECT parent FROM commit_links JOIN reachable ON hash = child ) UPDATE commits SET reachable = CASE WHEN hash IN tracked THEN 2 WHEN hash IN reachable THEN 1 ELSE 0 END ",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "2afda30451ececd424b4af3d8b106bdc99d72c8e3e0579cb6d3df8e66429bad1"
}

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT name FROM refs",
"describe": {
"columns": [
{
"name": "name",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "2dc2eb48b5b028483500adf2cb17b96cddd8b2201cdfc756e0b5c266d4d34720"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO refs (name, hash) VALUES (?, ?) ON CONFLICT (name) DO UPDATE SET hash = excluded.hash ",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "3204ae1d17471bf90a056119f164dd4f42670801bd647fb962f0ba8de37b050e"
}

View file

@ -1,6 +1,6 @@
{ {
"db_name": "SQLite", "db_name": "SQLite",
"query": "SELECT name, hash FROM tracked_refs", "query": "SELECT name, hash FROM refs WHERE tracked",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -22,5 +22,5 @@
false false
] ]
}, },
"hash": "f39201c5c3a530a7f659a923a230e3e42dbe19c1ea5d71e6dd5910905e4402d4" "hash": "3e31ed6194d487b58b8aa0f10438b731232104af05b8e5bd056b69e69c91b703"
} }

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "INSERT OR IGNORE INTO tracked_refs (name, hash) VALUES (?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "4a8f4b5856c5c4d117f80983e83d700592728bc0d8301b9054082958332d2ebd"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "UPDATE tracked_refs SET hash = ? WHERE name = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
},
"hash": "6e255c9db5fab15a188c0a1ecd333cc918937598a0c88851ac6f94de41796a33"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM refs WHERE name = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "9d5f5cd595152659bb3c6ca79d5400c6efed5c38377d44ffb4d63d37846b59a2"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\nINSERT OR IGNORE INTO commits (hash, author, author_date, committer, committer_date, message)\nVALUES (?, ?, ?, ?, ?, ?)\n",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "a3771c256dde301f1e99aa87da9345a271287beb7e0fea8f90bff9475a8de568"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\nWITH RECURSIVE reachable(hash) AS (\n SELECT hash FROM tracked_refs\n UNION\n SELECT parent FROM commit_links\n JOIN reachable ON hash = child\n)\n\nUPDATE commits\nSET reachable = (hash IN reachable)\n",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "afe943820305632281456f0a629065b3221314c59b1865c50b9bff51529c93d7"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "DELETE FROM tracked_refs WHERE name = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "d6249ada8a6f58fabb3877446e851cfc88c6188163e5726d83941cdfb6e41c9e"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT OR IGNORE INTO commits ( hash, author, author_date, committer, committer_date, message ) VALUES (?, ?, ?, ?, ?, ?) ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "e2cffac014ee500f62b785172b5c115a3c504ac7d1df072dfba25ad0ac33702b"
}

View file

@ -17,9 +17,10 @@ CREATE TABLE commit_links (
FOREIGN KEY (child) REFERENCES commits (hash) ON DELETE CASCADE FOREIGN KEY (child) REFERENCES commits (hash) ON DELETE CASCADE
) STRICT; ) STRICT;
CREATE TABLE tracked_refs ( CREATE TABLE refs (
name TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL PRIMARY KEY,
hash TEXT NOT NULL, hash TEXT NOT NULL,
tracked INT NOT NULL DEFAULT 0,
FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE
) STRICT; ) STRICT;

View file

@ -1,12 +1,13 @@
//! Add new commits to the database and update the tracked refs. //! Add new commits to the database and update the tracked refs.
// TODO Think about whether ref hashes should be tracked in the db
// TODO Prevent some sync stuff from blocking the async stuff // TODO Prevent some sync stuff from blocking the async stuff
use std::collections::HashSet; use std::collections::HashSet;
use futures::TryStreamExt; use futures::TryStreamExt;
use gix::{date::time::format::ISO8601_STRICT, objs::Kind, Commit, ObjectId, Repository}; use gix::{
date::time::format::ISO8601_STRICT, objs::Kind, refs::Reference, Commit, ObjectId, Repository,
};
use sqlx::{Acquire, SqliteConnection, SqlitePool}; use sqlx::{Acquire, SqliteConnection, SqlitePool};
use tracing::{debug, info}; use tracing::{debug, info};
@ -25,31 +26,33 @@ async fn get_all_commit_hashes_from_db(
Ok(hashes) Ok(hashes)
} }
fn get_new_commits_from_repo<'a, 'b: 'a>( fn get_all_refs_from_repo(repo: &Repository) -> somehow::Result<Vec<Reference>> {
repo: &'a Repository, let mut references = vec![];
old: &'b HashSet<ObjectId>, for reference in repo.references()?.all()? {
) -> somehow::Result<Vec<Commit<'a>>> { let mut reference = reference.map_err(somehow::Error::from_box)?;
// Collect all references starting with "refs" reference.peel_to_id_in_place()?;
let mut all_references: Vec<ObjectId> = vec![];
for reference in repo.references()?.prefixed("refs")? {
let reference = reference.map_err(somehow::Error::from_box)?;
let id = reference.into_fully_peeled_id()?;
// Some repos *cough*linuxkernel*cough* have refs that don't point to // Some repos *cough*linuxkernel*cough* have refs that don't point to
// commits. This makes the rev walk choke and die. We don't want that. // commits. This makes the rev walk choke and die. We don't want that.
if id.object()?.kind != Kind::Commit { if reference.id().object()?.kind != Kind::Commit {
continue; continue;
} }
all_references.push(id.into()); references.push(reference.detach());
} }
Ok(references)
}
fn get_new_commits_from_repo<'a, 'b: 'a>(
repo: &'a Repository,
refs: &[Reference],
old: &'b HashSet<ObjectId>,
) -> somehow::Result<Vec<Commit<'a>>> {
let ref_ids = refs.iter().flat_map(|r| r.peeled.into_iter());
// Walk from those until hitting old references // Walk from those until hitting old references
let mut new = vec![]; let mut new = vec![];
for commit in repo for commit in repo.rev_walk(ref_ids).selected(|c| !old.contains(c))? {
.rev_walk(all_references)
.selected(|c| !old.contains(c))?
{
let commit = commit?.id().object()?.try_into_commit()?; let commit = commit?.id().object()?.try_into_commit()?;
new.push(commit); new.push(commit);
} }
@ -72,10 +75,17 @@ async fn insert_new_commits(
let message = commit.message_raw()?.to_string(); let message = commit.message_raw()?.to_string();
sqlx::query!( sqlx::query!(
" "\
INSERT OR IGNORE INTO commits (hash, author, author_date, committer, committer_date, message) INSERT OR IGNORE INTO commits ( \
VALUES (?, ?, ?, ?, ?, ?) hash, \
", author, \
author_date, \
committer, \
committer_date, \
message \
) \
VALUES (?, ?, ?, ?, ?, ?) \
",
hash, hash,
author, author,
author_date, author_date,
@ -118,65 +128,78 @@ async fn mark_all_commits_as_old(conn: &mut SqliteConnection) -> somehow::Result
Ok(()) Ok(())
} }
async fn update_refs(conn: &mut SqliteConnection, refs: Vec<Reference>) -> somehow::Result<()> {
// Remove refs that no longer exist
let existing = refs
.iter()
.map(|r| r.name.to_string())
.collect::<HashSet<_>>();
let current = sqlx::query!("SELECT name FROM refs")
.fetch_all(&mut *conn)
.await?;
for reference in current {
if !existing.contains(&reference.name) {
sqlx::query!("DELETE FROM refs WHERE name = ?", reference.name)
.execute(&mut *conn)
.await?;
}
}
// Add new refs and update existing refs
for reference in refs {
let name = reference.name.to_string();
let Some(hash) = reference.peeled else { continue; };
let hash = hash.to_string();
sqlx::query!(
"\
INSERT INTO refs (name, hash) VALUES (?, ?) \
ON CONFLICT (name) DO UPDATE \
SET hash = excluded.hash \
",
name,
hash
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
async fn track_main_branch(conn: &mut SqliteConnection, repo: &Repository) -> somehow::Result<()> { async fn track_main_branch(conn: &mut SqliteConnection, repo: &Repository) -> somehow::Result<()> {
let Some(head) = repo.head_ref()? else { return Ok(()); }; let Some(head) = repo.head_ref()? else { return Ok(()); };
let name = head.inner.name.to_string(); let name = head.inner.name.to_string();
let hash = head.into_fully_peeled_id()?.to_string(); sqlx::query!("UPDATE refs SET tracked = true WHERE name = ?", name)
sqlx::query!(
"INSERT OR IGNORE INTO tracked_refs (name, hash) VALUES (?, ?)",
name,
hash,
)
.execute(conn) .execute(conn)
.await?; .await?;
Ok(()) Ok(())
} }
// TODO Write all refs to DB, not just tracked ones
async fn update_tracked_refs(
conn: &mut SqliteConnection,
repo: &Repository,
) -> somehow::Result<()> {
let tracked_refs = sqlx::query!("SELECT name, hash FROM tracked_refs")
.fetch_all(&mut *conn)
.await?;
for tracked_ref in tracked_refs {
if let Some(reference) = repo.try_find_reference(&tracked_ref.name)? {
let hash = reference.id().to_string();
if hash != tracked_ref.hash {
debug!("Updated tracked ref {}", tracked_ref.name);
sqlx::query!(
"UPDATE tracked_refs SET hash = ? WHERE name = ?",
hash,
tracked_ref.name
)
.execute(&mut *conn)
.await?;
}
} else {
debug!("Deleted tracked ref {}", tracked_ref.name);
sqlx::query!("DELETE FROM tracked_refs WHERE name = ?", tracked_ref.name)
.execute(&mut *conn)
.await?;
}
}
Ok(())
}
// TODO tracked -> reachable, 0 = unreachable, 1 = reachable, 2 = reachable from tracked ref
async fn update_commit_tracked_status(conn: &mut SqliteConnection) -> somehow::Result<()> { async fn update_commit_tracked_status(conn: &mut SqliteConnection) -> somehow::Result<()> {
sqlx::query!( sqlx::query!(
" "\
WITH RECURSIVE reachable(hash) AS ( WITH RECURSIVE \
SELECT hash FROM tracked_refs tracked (hash) AS ( \
UNION SELECT hash FROM refs WHERE tracked \
SELECT parent FROM commit_links UNION \
JOIN reachable ON hash = child SELECT parent FROM commit_links \
) JOIN tracked ON hash = child \
), \
UPDATE commits reachable (hash) AS ( \
SET reachable = (hash IN reachable) SELECT hash FROM refs \
UNION \
SELECT hash FROM tracked \
UNION \
SELECT parent FROM commit_links \
JOIN reachable ON hash = child \
) \
UPDATE commits \
SET reachable = CASE \
WHEN hash IN tracked THEN 2 \
WHEN hash IN reachable THEN 1 \
ELSE 0 \
END \
" "
) )
.execute(conn) .execute(conn)
@ -197,7 +220,8 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> {
info!("Initializing new repo"); info!("Initializing new repo");
} }
let new = get_new_commits_from_repo(repo, &old)?; let refs = get_all_refs_from_repo(repo)?;
let new = get_new_commits_from_repo(repo, &refs, &old)?;
debug!("Found {} new commits in repo", new.len()); debug!("Found {} new commits in repo", new.len());
// Defer foreign key checks until the end of the transaction to improve // Defer foreign key checks until the end of the transaction to improve
@ -211,15 +235,15 @@ pub async fn update(db: &SqlitePool, repo: &Repository) -> somehow::Result<()> {
// commit and so on). // commit and so on).
insert_new_commits(conn, &new).await?; insert_new_commits(conn, &new).await?;
insert_new_commit_links(conn, &new).await?; insert_new_commit_links(conn, &new).await?;
debug!("Inserted {} new commits into db", new.len());
if repo_is_new { if repo_is_new {
mark_all_commits_as_old(conn).await?; mark_all_commits_as_old(conn).await?;
track_main_branch(conn, repo).await?;
debug!("Prepared new repo");
} }
debug!("Inserted {} new commits into db", new.len());
update_tracked_refs(conn, repo).await?; update_refs(conn, refs).await?;
if repo_is_new {
track_main_branch(conn, repo).await?;
}
update_commit_tracked_status(conn).await?; update_commit_tracked_status(conn).await?;
debug!("Updated tracked refs"); debug!("Updated tracked refs");

View file

@ -29,7 +29,7 @@ pub async fn get(
) -> somehow::Result<impl IntoResponse> { ) -> somehow::Result<impl IntoResponse> {
let repo = repo.to_thread_local(); let repo = repo.to_thread_local();
let rows = sqlx::query!("SELECT name, hash FROM tracked_refs") let rows = sqlx::query!("SELECT name, hash FROM refs WHERE tracked")
.fetch_all(&db) .fetch_all(&db)
.await?; .await?;