Insert new commits into the queue

This commit is contained in:
Joscha 2023-08-06 18:08:04 +02:00
parent 553a56bb12
commit 4f11b9c912
10 changed files with 128 additions and 14 deletions

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO queue (id, hash, date) VALUES (?, ?, ?)",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "09d6281af1344afb2d6265149d4026e72badc3e18bc53a03e6b78011ee270ee1"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "UPDATE commits SET new = false",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "373c31648cfc15a3a42b51dc74025ee5fb1a90da7ecaf4bf4b330076f33e74dd"
}

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT hash FROM commits WHERE new AND reachable = 2",
"describe": {
"columns": [
{
"name": "hash",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "726102c4f61f95047a64cdc21834d0a5544950ecfe9b1d6716521dcf1d0b8fb2"
}

1
Cargo.lock generated
View file

@ -2769,6 +2769,7 @@ dependencies = [
"humantime",
"humantime-serde",
"mime_guess",
"rand",
"rust-embed",
"serde",
"sqlx",

View file

@ -14,6 +14,7 @@ futures = "0.3.28"
humantime = "2.1.0"
humantime-serde = "1.1.1"
mime_guess = "2.0.4"
rand = "0.8.5"
rust-embed = "6.8.1"
serde = { version = "1.0.181", features = ["derive"] }
sqlx = { version = "0.7.1", features = ["runtime-tokio", "sqlite", "time"] }

View file

@ -0,0 +1,7 @@
CREATE TABLE queue (
id TEXT NOT NULL PRIMARY KEY,
hash TEXT NOT NULL,
date TEXT NOT NULL,
priority INT NOT NULL DEFAULT 0,
FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE
) STRICT;

View file

@ -3,6 +3,7 @@
// TODO `fetch` submodule for fetching new commits
// TODO `queue` submodule for updating the queue
mod queue;
mod repo;
use tracing::{debug_span, error, Instrument};
@ -17,6 +18,14 @@ async fn recurring_task(state: &AppState) {
}
.instrument(debug_span!("update repo"))
.await;
async {
if let Err(e) = queue::update(&state.db).await {
error!("Error updating queue:\n{e:?}");
};
}
.instrument(debug_span!("update queue"))
.await;
}
pub async fn run(state: AppState) {

41
src/recurring/queue.rs Normal file
View file

@ -0,0 +1,41 @@
use sqlx::{Acquire, SqlitePool};
use time::OffsetDateTime;
use tracing::debug;
use crate::{somehow, util};
pub async fn update(db: &SqlitePool) -> somehow::Result<()> {
debug!("Updating queue");
let mut tx = db.begin().await?;
let conn = tx.acquire().await?;
// Get all newly added tracked commits
let new = sqlx::query!("SELECT hash FROM commits WHERE new AND reachable = 2")
.fetch_all(&mut *conn)
.await?;
let new_len = new.len();
// Insert them into the queue
for row in new {
let id = util::new_run_id();
let date = OffsetDateTime::now_utc();
sqlx::query!(
"INSERT INTO queue (id, hash, date) VALUES (?, ?, ?)",
id,
row.hash,
date
)
.execute(&mut *conn)
.await?;
}
debug!("Added {new_len} commits to the queue");
// Mark all commits as old
sqlx::query!("UPDATE commits SET new = false")
.execute(&mut *conn)
.await?;
tx.commit().await?;
debug!("Updated queue");
Ok(())
}

View file

@ -4,13 +4,12 @@ use std::{collections::HashSet, sync::Arc};
use futures::TryStreamExt;
use gix::{
actor::IdentityRef, date::time::format::ISO8601_STRICT, objs::Kind, prelude::ObjectIdExt,
refs::Reference, ObjectId, Repository, ThreadSafeRepository,
objs::Kind, prelude::ObjectIdExt, refs::Reference, ObjectId, Repository, ThreadSafeRepository,
};
use sqlx::{Acquire, SqliteConnection, SqlitePool};
use tracing::{debug, info};
use crate::somehow;
use crate::{somehow, util};
async fn get_all_commit_hashes_from_db(
conn: &mut SqliteConnection,
@ -67,12 +66,6 @@ fn get_all_refs_and_new_commits_from_repo(
Ok((refs, new))
}
pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result<String> {
let mut buffer = vec![];
author.trim().write_to(&mut buffer)?;
Ok(String::from_utf8_lossy(&buffer).to_string())
}
async fn insert_new_commits(
conn: &mut SqliteConnection,
repo: &Repository,
@ -82,11 +75,11 @@ async fn insert_new_commits(
let commit = id.attach(repo).object()?.try_into_commit()?;
let hash = commit.id.to_string();
let author_info = commit.author()?;
let author = format_actor(author_info.actor())?;
let author_date = author_info.time.format(ISO8601_STRICT);
let author = util::format_actor(author_info.actor())?;
let author_date = util::time_to_offset_datetime(author_info.time)?;
let committer_info = commit.committer()?;
let committer = format_actor(committer_info.actor())?;
let committer_date = committer_info.time.format(ISO8601_STRICT);
let committer = util::format_actor(committer_info.actor())?;
let committer_date = util::time_to_offset_datetime(committer_info.time)?;
let message = commit.message_raw()?.to_string();
sqlx::query!(

View file

@ -1,10 +1,22 @@
use std::time::Duration;
use gix::date::Time;
use gix::{actor::IdentityRef, date::Time};
use rand::{rngs::OsRng, seq::IteratorRandom};
use time::{macros::format_description, OffsetDateTime, UtcOffset};
use crate::somehow;
const RUN_ID_PREFIX: &str = "r-";
const RUN_ID_CHARS: &str = "0123456789abcdefghijklmnopqrstuvwxyz";
const RUN_ID_LEN: usize = 30; // log(16^40, base=len(RUN_ID_CHARS)) ~ 31
pub fn new_run_id() -> String {
RUN_ID_PREFIX
.chars()
.chain((0..RUN_ID_LEN).map(|_| RUN_ID_CHARS.chars().choose(&mut OsRng).unwrap()))
.collect()
}
pub fn time_to_offset_datetime(time: Time) -> somehow::Result<OffsetDateTime> {
Ok(OffsetDateTime::from_unix_timestamp(time.seconds)?
.to_offset(UtcOffset::from_whole_seconds(time.offset)?))
@ -26,6 +38,12 @@ pub fn format_time(time: OffsetDateTime) -> somehow::Result<String> {
})
}
pub fn format_actor(author: IdentityRef<'_>) -> somehow::Result<String> {
let mut buffer = vec![];
author.trim().write_to(&mut buffer)?;
Ok(String::from_utf8_lossy(&buffer).to_string())
}
pub fn format_commit_summary(message: &str) -> String {
// Take everything up to the first double newline
let title = message