From 5e0b8e3c8cf3ef96f69f1e4aa10db431600da593 Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 11 Aug 2023 15:16:20 +0200 Subject: [PATCH] Store finished work in db --- ...48dc89a3b39e6f86629df9fdcdc5b50b782ef.json | 12 ++ ...80139615b69896a84dd1340759db6adbebf1f.json | 12 ++ ...2710279ba817bc5f50661f592371297efd651.json | 12 ++ ...d64a24e0f4a33d5b792394d39c82aca2487dd.json | 12 ++ migrations/20230805101911_commits.sql | 2 + migrations/20230806143356_queue.sql | 1 + migrations/20230809141314_runs.sql | 19 ++- src/server/web/api.rs | 154 ++++++++++++++---- src/shared.rs | 4 +- 9 files changed, 190 insertions(+), 38 deletions(-) create mode 100644 .sqlx/query-1df4bf7bd7040b8aef0865f2c4a48dc89a3b39e6f86629df9fdcdc5b50b782ef.json create mode 100644 .sqlx/query-33cbc60a8d06adf3c3836d4193a80139615b69896a84dd1340759db6adbebf1f.json create mode 100644 .sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json create mode 100644 .sqlx/query-6b74f746c36091274ad5477aad2d64a24e0f4a33d5b792394d39c82aca2487dd.json diff --git a/.sqlx/query-1df4bf7bd7040b8aef0865f2c4a48dc89a3b39e6f86629df9fdcdc5b50b782ef.json b/.sqlx/query-1df4bf7bd7040b8aef0865f2c4a48dc89a3b39e6f86629df9fdcdc5b50b782ef.json new file mode 100644 index 0000000..cd47963 --- /dev/null +++ b/.sqlx/query-1df4bf7bd7040b8aef0865f2c4a48dc89a3b39e6f86629df9fdcdc5b50b782ef.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM queue WHERE hash = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "1df4bf7bd7040b8aef0865f2c4a48dc89a3b39e6f86629df9fdcdc5b50b782ef" +} diff --git a/.sqlx/query-33cbc60a8d06adf3c3836d4193a80139615b69896a84dd1340759db6adbebf1f.json b/.sqlx/query-33cbc60a8d06adf3c3836d4193a80139615b69896a84dd1340759db6adbebf1f.json new file mode 100644 index 0000000..6b8df3f --- /dev/null +++ b/.sqlx/query-33cbc60a8d06adf3c3836d4193a80139615b69896a84dd1340759db6adbebf1f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO run_measurements ( id, name, value, stddev, unit, direction ) VALUES (?, ?, ?, ?, ?, ?) ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "33cbc60a8d06adf3c3836d4193a80139615b69896a84dd1340759db6adbebf1f" +} diff --git a/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json b/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json new file mode 100644 index 0000000..53c6f0c --- /dev/null +++ b/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO runs ( id, hash, start, end, exit_code ) VALUES (?, ?, ?, ?, ?) ", + "describe": { + "columns": [], + "parameters": { + "Right": 5 + }, + "nullable": [] + }, + "hash": "40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651" +} diff --git a/.sqlx/query-6b74f746c36091274ad5477aad2d64a24e0f4a33d5b792394d39c82aca2487dd.json b/.sqlx/query-6b74f746c36091274ad5477aad2d64a24e0f4a33d5b792394d39c82aca2487dd.json new file mode 100644 index 0000000..45b2ce9 --- /dev/null +++ b/.sqlx/query-6b74f746c36091274ad5477aad2d64a24e0f4a33d5b792394d39c82aca2487dd.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO run_output ( id, idx, source, text ) VALUES (?, ?, ?, ?) ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "6b74f746c36091274ad5477aad2d64a24e0f4a33d5b792394d39c82aca2487dd" +} diff --git a/migrations/20230805101911_commits.sql b/migrations/20230805101911_commits.sql index 7988199..208e488 100644 --- a/migrations/20230805101911_commits.sql +++ b/migrations/20230805101911_commits.sql @@ -12,6 +12,7 @@ CREATE TABLE commits ( CREATE TABLE commit_links ( child TEXT NOT NULL, parent TEXT NOT NULL, + PRIMARY KEY (parent, child), FOREIGN KEY (parent) REFERENCES commits (hash) ON DELETE CASCADE, FOREIGN KEY (child) REFERENCES commits (hash) ON DELETE CASCADE @@ -21,6 +22,7 @@ CREATE TABLE refs ( name TEXT NOT NULL PRIMARY KEY, hash TEXT NOT NULL, tracked INT NOT NULL DEFAULT 0, + FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE ) STRICT; diff --git a/migrations/20230806143356_queue.sql b/migrations/20230806143356_queue.sql index 8b7e816..5649450 100644 --- a/migrations/20230806143356_queue.sql +++ b/migrations/20230806143356_queue.sql @@ -2,5 +2,6 @@ CREATE TABLE queue ( hash TEXT NOT NULL PRIMARY KEY, date TEXT NOT NULL, priority INT NOT NULL DEFAULT 0, + FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE ) STRICT; diff --git a/migrations/20230809141314_runs.sql b/migrations/20230809141314_runs.sql index 5431c1c..f58ec65 100644 --- a/migrations/20230809141314_runs.sql +++ b/migrations/20230809141314_runs.sql @@ -1,11 +1,14 @@ CREATE TABLE runs ( - id TEXT NOT NULL PRIMARY KEY, - hash TEXT NOT NULL, + id TEXT NOT NULL PRIMARY KEY, + hash TEXT NOT NULL, + start TEXT NOT NULL, + end TEXT NOT NULL, + exit_code INT NOT NULL, FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE ) STRICT; -CREATE TABLE measurements ( +CREATE TABLE run_measurements ( id TEXT NOT NULL, name TEXT NOT NULL, value REAL NOT NULL, @@ -16,3 +19,13 @@ CREATE TABLE measurements ( PRIMARY KEY (id, name), FOREIGN KEY (id) REFERENCES runs (id) ON DELETE CASCADE ) STRICT; + +CREATE TABLE run_output ( + id TEXT NOT NULL, + idx INT NOT NULL, + source INT NOT NULL, + text TEXT NOT NULL, + + PRIMARY KEY (id, idx), + FOREIGN KEY (id) REFERENCES runs (id) ON DELETE CASCADE +) STRICT; diff --git a/src/server/web/api.rs b/src/server/web/api.rs index a79cdd9..05a92d3 100644 --- a/src/server/web/api.rs +++ b/src/server/web/api.rs @@ -14,7 +14,7 @@ use axum::{ Json, Router, TypedHeader, }; use gix::{ObjectId, ThreadSafeRepository}; -use sqlx::SqlitePool; +use sqlx::{Acquire, SqlitePool}; use time::OffsetDateTime; use tracing::debug; @@ -24,10 +24,109 @@ use crate::{ workers::{WorkerInfo, Workers}, BenchRepo, Repo, Server, }, - shared::{BenchMethod, ServerResponse, Work, WorkerRequest}, + shared::{BenchMethod, FinishedRun, ServerResponse, Work, WorkerRequest}, somehow, }; +async fn save_work(run: FinishedRun, db: SqlitePool) -> somehow::Result<()> { + let mut tx = db.begin().await?; + let conn = tx.acquire().await?; + + sqlx::query!( + "\ + INSERT INTO runs ( \ + id, \ + hash, \ + start, \ + end, \ + exit_code \ + ) \ + VALUES (?, ?, ?, ?, ?) \ + ", + run.id, + run.hash, + run.start, + run.end, + run.exit_code, + ) + .execute(&mut *conn) + .await?; + + for (name, measurement) in run.measurements { + sqlx::query!( + "\ + INSERT INTO run_measurements ( \ + id, \ + name, \ + value, \ + stddev, \ + unit, \ + direction \ + ) \ + VALUES (?, ?, ?, ?, ?, ?) \ + ", + run.id, + name, + measurement.value, + measurement.stddev, + measurement.unit, + measurement.direction, + ) + .execute(&mut *conn) + .await?; + } + + for (idx, (source, text)) in run.output.into_iter().enumerate() { + // Hopefully we won't need more than 4294967296 output chunks per run :P + let idx = idx as u32; + sqlx::query!( + "\ + INSERT INTO run_output ( \ + id, \ + idx, \ + source, \ + text \ + ) \ + VALUES (?, ?, ?, ?) \ + ", + run.id, + idx, + source, + text, + ) + .execute(&mut *conn) + .await?; + } + + // The thing has been done :D + sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash) + .execute(&mut *conn) + .await?; + + tx.commit().await?; + Ok(()) +} + +fn prepare_work( + work: Option<&str>, + bench_repo: Option, +) -> somehow::Result> { + Ok(if let Some(hash) = work { + let bench = match bench_repo { + Some(bench_repo) => BenchMethod::Repo { + hash: bench_repo.0.to_thread_local().head_id()?.to_string(), + }, + None => BenchMethod::Internal, + }; + Some(Work { + hash: hash.to_string(), + bench, + }) + } else { + None + }) +} + async fn post_status( State(config): State<&'static Config>, State(db): State, @@ -50,40 +149,29 @@ async fn post_status( .fetch_all(&db) .await?; - let mut guard = workers.lock().unwrap(); - guard.clean(); - if !guard.verify(&name, &request.secret) { - return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response()); - } - guard.update( - name.clone(), - WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status), - ); - let work = match request.request_work { - true => guard.find_free_work(&queue), - false => None, - }; - let abort_work = guard.should_abort_work(&name); - drop(guard); - - // TODO Insert finished work into DB - - // Find new work - let work = if let Some(hash) = work { - let bench = match bench_repo { - Some(bench_repo) => BenchMethod::Repo { - hash: bench_repo.0.to_thread_local().head_id()?.to_string(), - }, - None => BenchMethod::Internal, + let (work, abort_work) = { + let mut guard = workers.lock().unwrap(); + guard.clean(); + if !guard.verify(&name, &request.secret) { + return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response()); + } + guard.update( + name.clone(), + WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status), + ); + let work = match request.request_work { + true => guard.find_free_work(&queue), + false => None, }; - Some(Work { - hash: hash.to_string(), - bench, - }) - } else { - None + let abort_work = guard.should_abort_work(&name); + (work, abort_work) }; + if let Some(run) = request.submit_work { + save_work(run, db).await?; + } + + let work = prepare_work(work, bench_repo)?; debug!("Received status update from {name}"); Ok(Json(ServerResponse { work, abort_work }).into_response()) } diff --git a/src/shared.rs b/src/shared.rs index 3d58e7e..52535a1 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -10,7 +10,7 @@ fn is_false(b: &bool) -> bool { !b } -#[derive(Clone, Serialize_repr, Deserialize_repr)] +#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)] #[repr(i8)] pub enum Direction { LessIsBetter = -1, @@ -29,7 +29,7 @@ pub struct Measurement { pub direction: Option, } -#[derive(Clone, Serialize_repr, Deserialize_repr)] +#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)] #[repr(u8)] pub enum Source { // Stdin would be fd 0