From 1ec72c92d54a9b1aa9099528c9aeee6b22400e6c Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 14 Aug 2023 16:47:00 +0200 Subject: [PATCH] Allow worker to specify bench method as string --- src/server/web/api/worker.rs | 32 ++++++++++++-------------------- src/server/web/pages/queue.rs | 23 ++++++++--------------- src/server/web/pages/worker.rs | 21 +++++++-------------- src/server/workers.rs | 15 ++++++++------- src/shared.rs | 23 ++++++++++++++++++----- src/worker/run.rs | 20 ++++++++++++++------ 6 files changed, 67 insertions(+), 67 deletions(-) diff --git a/src/server/web/api/worker.rs b/src/server/web/api/worker.rs index 0b48f62..b85a155 100644 --- a/src/server/web/api/worker.rs +++ b/src/server/web/api/worker.rs @@ -32,7 +32,7 @@ use crate::{ }; async fn save_work( - finished: FinishedRun, + run: FinishedRun, worker_name: &str, worker_info: &Option, db: &SqlitePool, @@ -40,15 +40,7 @@ async fn save_work( let mut tx = db.begin().await?; let conn = tx.acquire().await?; - let end = finished - .end - .map(|t| t.0) - .unwrap_or_else(OffsetDateTime::now_utc); - - let bench_method = match finished.run.bench_method { - BenchMethod::Internal => "internal".to_string(), - BenchMethod::Repo { hash } => format!("bench repo, hash {hash}"), - }; + let end = run.end.map(|t| t.0).unwrap_or_else(OffsetDateTime::now_utc); sqlx::query!( "\ @@ -64,19 +56,19 @@ async fn save_work( ) \ VALUES (?, ?, ?, ?, ?, ?, ?, ?) \ ", - finished.run.id, - finished.run.hash, - bench_method, + run.id, + run.hash, + run.bench_method, worker_name, worker_info, - finished.run.start.0, + run.start.0, end, - finished.exit_code, + run.exit_code, ) .execute(&mut *conn) .await?; - for (name, measurement) in finished.measurements { + for (name, measurement) in run.measurements { sqlx::query!( "\ INSERT INTO run_measurements ( \ @@ -89,7 +81,7 @@ async fn save_work( ) \ VALUES (?, ?, ?, ?, ?, ?) \ ", - finished.run.id, + run.id, name, measurement.value, measurement.stddev, @@ -100,7 +92,7 @@ async fn save_work( .await?; } - for (idx, (source, text)) in finished.output.into_iter().enumerate() { + for (idx, (source, text)) in run.output.into_iter().enumerate() { // Hopefully we won't need more than 4294967296 output chunks per run :P let idx = idx as u32; sqlx::query!( @@ -113,7 +105,7 @@ async fn save_work( ) \ VALUES (?, ?, ?, ?) \ ", - finished.run.id, + run.id, idx, source, text, @@ -123,7 +115,7 @@ async fn save_work( } // The thing has been done :D - sqlx::query!("DELETE FROM queue WHERE hash = ?", finished.run.hash) + sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash) .execute(&mut *conn) .await?; diff --git a/src/server/web/pages/queue.rs b/src/server/web/pages/queue.rs index d42ba3d..5fb3701 100644 --- a/src/server/web/pages/queue.rs +++ b/src/server/web/pages/queue.rs @@ -76,19 +76,12 @@ async fn get_workers( let status = match &info.status { WorkerStatus::Idle => Status::Idle, WorkerStatus::Busy => Status::Busy, - WorkerStatus::Working(unfinished) => { - let message = sqlx::query_scalar!( - "SELECT message FROM commits WHERE hash = ?", - unfinished.run.hash - ) - .fetch_one(db) - .await?; - Status::Working(LinkRunShort::new( - base, - unfinished.run.id.clone(), - &unfinished.run.hash, - &message, - )) + WorkerStatus::Working(run) => { + let message = + sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash) + .fetch_one(db) + .await?; + Status::Working(LinkRunShort::new(base, run.id.clone(), &run.hash, &message)) } }; @@ -108,9 +101,9 @@ async fn get_queue_data( // Group workers by commit hash let mut workers_by_commit: HashMap> = HashMap::new(); for (name, info) in workers { - if let WorkerStatus::Working(unfinished) = &info.status { + if let WorkerStatus::Working(run) = &info.status { workers_by_commit - .entry(unfinished.run.hash.clone()) + .entry(run.hash.clone()) .or_default() .push(LinkWorker::new(base, name.clone())); } diff --git a/src/server/web/pages/worker.rs b/src/server/web/pages/worker.rs index 04f6751..a2cd835 100644 --- a/src/server/web/pages/worker.rs +++ b/src/server/web/pages/worker.rs @@ -43,21 +43,14 @@ async fn status(status: &WorkerStatus, db: &SqlitePool, base: &Base) -> somehow: Ok(match status { WorkerStatus::Idle => Status::Idle, WorkerStatus::Busy => Status::Busy, - WorkerStatus::Working(unfinished) => { - let message = sqlx::query_scalar!( - "SELECT message FROM commits WHERE hash = ?", - unfinished.run.hash - ) - .fetch_one(db) - .await?; + WorkerStatus::Working(run) => { + let message = + sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash) + .fetch_one(db) + .await?; Status::Working { - link: LinkRunShort::new( - base, - unfinished.run.id.clone(), - &unfinished.run.hash, - &message, - ), - since: util::format_time(unfinished.run.start.0), + link: LinkRunShort::new(base, run.id.clone(), &run.hash, &message), + since: util::format_time(run.start.0), } } }) diff --git a/src/server/workers.rs b/src/server/workers.rs index 3ee7094..a350348 100644 --- a/src/server/workers.rs +++ b/src/server/workers.rs @@ -67,7 +67,7 @@ impl Workers { .values() .filter_map(|info| match &info.status { WorkerStatus::Idle | WorkerStatus::Busy => None, - WorkerStatus::Working(unfinished) => Some(&unfinished.run.hash), + WorkerStatus::Working(run) => Some(&run.hash), }) .collect::>(); @@ -84,7 +84,10 @@ impl Workers { // Reserve work so other workers don't choose it if let Some(info) = self.workers.get_mut(name) { info.status = WorkerStatus::Working(UnfinishedRun { - run: run.clone(), + id: run.id.clone(), + hash: run.hash.clone(), + bench_method: run.bench_method.to_string(), + start: run.start, last_output: vec![], }); } @@ -95,10 +98,10 @@ impl Workers { pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool { // A worker should abort work if... let Some(info) = self.workers.get(name) else { return false; }; - let WorkerStatus::Working (unfinished) = &info.status else { return false; }; + let WorkerStatus::Working (run) = &info.status else { return false; }; // The commit isn't in the queue - if !queue.contains(&unfinished.run.hash) { + if !queue.contains(&run.hash) { return true; } @@ -107,9 +110,7 @@ impl Workers { .workers .iter() .filter_map(|(name, info)| match &info.status { - WorkerStatus::Working(u) if u.run.hash == unfinished.run.hash => { - Some((name, u.run.start)) - } + WorkerStatus::Working(u) if u.hash == run.hash => Some((name, u.start)), _ => None, }) .max_by_key(|(_, start)| start.0) diff --git a/src/shared.rs b/src/shared.rs index ed4086f..8ad0d9d 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -1,6 +1,6 @@ //! Data structures modelling the communication between server and worker. -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; use serde::{de, Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; @@ -72,6 +72,15 @@ pub enum BenchMethod { Repo { hash: String }, } +impl fmt::Display for BenchMethod { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BenchMethod::Internal => write!(f, "internal"), + BenchMethod::Repo { hash } => write!(f, "bench repo, hash {hash}"), + } + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct Run { pub id: String, @@ -82,8 +91,10 @@ pub struct Run { #[derive(Clone, Serialize, Deserialize)] pub struct UnfinishedRun { - #[serde(flatten)] - pub run: Run, + pub id: String, + pub hash: String, + pub bench_method: String, + pub start: Rfc3339Time, #[serde(default)] pub last_output: Vec<(Source, String)>, @@ -91,8 +102,10 @@ pub struct UnfinishedRun { #[derive(Clone, Serialize, Deserialize)] pub struct FinishedRun { - #[serde(flatten)] - pub run: Run, + pub id: String, + pub hash: String, + pub bench_method: String, + pub start: Rfc3339Time, /// Override the server's end time. /// diff --git a/src/worker/run.rs b/src/worker/run.rs index 2792d88..b334719 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -48,7 +48,6 @@ impl RunInProgress { } pub fn as_unfinished_run(&self) -> UnfinishedRun { - let run = self.run.clone(); let last_output = self .output .lock() @@ -59,7 +58,13 @@ impl RunInProgress { .rev() .cloned() .collect(); - UnfinishedRun { run, last_output } + UnfinishedRun { + id: self.run.id.clone(), + hash: self.run.hash.clone(), + bench_method: self.run.bench_method.to_string(), + start: self.run.start, + last_output, + } } pub fn log_stdout(&self, line: String) { @@ -79,7 +84,7 @@ impl RunInProgress { } .await; - let finished = match result { + let run = match result { Ok(outcome) => outcome, Err(e) => { error!("Error during run:\n{e:?}"); @@ -96,11 +101,14 @@ impl RunInProgress { std::mem::swap(&mut output, &mut *self.output.lock().unwrap()); Some(FinishedRun { - run: self.run.clone(), + id: self.run.id.clone(), + hash: self.run.hash.clone(), + bench_method: self.run.bench_method.to_string(), + start: self.run.start, end: None, - exit_code: finished.exit_code, + exit_code: run.exit_code, output, - measurements: finished.measurements, + measurements: run.measurements, }) } }