Allow worker to specify bench method as string

This commit is contained in:
Joscha 2023-08-14 16:47:00 +02:00
parent 3de35e3ac8
commit 1ec72c92d5
6 changed files with 67 additions and 67 deletions

View file

@ -32,7 +32,7 @@ use crate::{
}; };
async fn save_work( async fn save_work(
finished: FinishedRun, run: FinishedRun,
worker_name: &str, worker_name: &str,
worker_info: &Option<String>, worker_info: &Option<String>,
db: &SqlitePool, db: &SqlitePool,
@ -40,15 +40,7 @@ async fn save_work(
let mut tx = db.begin().await?; let mut tx = db.begin().await?;
let conn = tx.acquire().await?; let conn = tx.acquire().await?;
let end = finished let end = run.end.map(|t| t.0).unwrap_or_else(OffsetDateTime::now_utc);
.end
.map(|t| t.0)
.unwrap_or_else(OffsetDateTime::now_utc);
let bench_method = match finished.run.bench_method {
BenchMethod::Internal => "internal".to_string(),
BenchMethod::Repo { hash } => format!("bench repo, hash {hash}"),
};
sqlx::query!( sqlx::query!(
"\ "\
@ -64,19 +56,19 @@ async fn save_work(
) \ ) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?) \ VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
", ",
finished.run.id, run.id,
finished.run.hash, run.hash,
bench_method, run.bench_method,
worker_name, worker_name,
worker_info, worker_info,
finished.run.start.0, run.start.0,
end, end,
finished.exit_code, run.exit_code,
) )
.execute(&mut *conn) .execute(&mut *conn)
.await?; .await?;
for (name, measurement) in finished.measurements { for (name, measurement) in run.measurements {
sqlx::query!( sqlx::query!(
"\ "\
INSERT INTO run_measurements ( \ INSERT INTO run_measurements ( \
@ -89,7 +81,7 @@ async fn save_work(
) \ ) \
VALUES (?, ?, ?, ?, ?, ?) \ VALUES (?, ?, ?, ?, ?, ?) \
", ",
finished.run.id, run.id,
name, name,
measurement.value, measurement.value,
measurement.stddev, measurement.stddev,
@ -100,7 +92,7 @@ async fn save_work(
.await?; .await?;
} }
for (idx, (source, text)) in finished.output.into_iter().enumerate() { for (idx, (source, text)) in run.output.into_iter().enumerate() {
// Hopefully we won't need more than 4294967296 output chunks per run :P // Hopefully we won't need more than 4294967296 output chunks per run :P
let idx = idx as u32; let idx = idx as u32;
sqlx::query!( sqlx::query!(
@ -113,7 +105,7 @@ async fn save_work(
) \ ) \
VALUES (?, ?, ?, ?) \ VALUES (?, ?, ?, ?) \
", ",
finished.run.id, run.id,
idx, idx,
source, source,
text, text,
@ -123,7 +115,7 @@ async fn save_work(
} }
// The thing has been done :D // The thing has been done :D
sqlx::query!("DELETE FROM queue WHERE hash = ?", finished.run.hash) sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash)
.execute(&mut *conn) .execute(&mut *conn)
.await?; .await?;

View file

@ -76,19 +76,12 @@ async fn get_workers(
let status = match &info.status { let status = match &info.status {
WorkerStatus::Idle => Status::Idle, WorkerStatus::Idle => Status::Idle,
WorkerStatus::Busy => Status::Busy, WorkerStatus::Busy => Status::Busy,
WorkerStatus::Working(unfinished) => { WorkerStatus::Working(run) => {
let message = sqlx::query_scalar!( let message =
"SELECT message FROM commits WHERE hash = ?", sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash)
unfinished.run.hash
)
.fetch_one(db) .fetch_one(db)
.await?; .await?;
Status::Working(LinkRunShort::new( Status::Working(LinkRunShort::new(base, run.id.clone(), &run.hash, &message))
base,
unfinished.run.id.clone(),
&unfinished.run.hash,
&message,
))
} }
}; };
@ -108,9 +101,9 @@ async fn get_queue_data(
// Group workers by commit hash // Group workers by commit hash
let mut workers_by_commit: HashMap<String, Vec<LinkWorker>> = HashMap::new(); let mut workers_by_commit: HashMap<String, Vec<LinkWorker>> = HashMap::new();
for (name, info) in workers { for (name, info) in workers {
if let WorkerStatus::Working(unfinished) = &info.status { if let WorkerStatus::Working(run) = &info.status {
workers_by_commit workers_by_commit
.entry(unfinished.run.hash.clone()) .entry(run.hash.clone())
.or_default() .or_default()
.push(LinkWorker::new(base, name.clone())); .push(LinkWorker::new(base, name.clone()));
} }

View file

@ -43,21 +43,14 @@ async fn status(status: &WorkerStatus, db: &SqlitePool, base: &Base) -> somehow:
Ok(match status { Ok(match status {
WorkerStatus::Idle => Status::Idle, WorkerStatus::Idle => Status::Idle,
WorkerStatus::Busy => Status::Busy, WorkerStatus::Busy => Status::Busy,
WorkerStatus::Working(unfinished) => { WorkerStatus::Working(run) => {
let message = sqlx::query_scalar!( let message =
"SELECT message FROM commits WHERE hash = ?", sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash)
unfinished.run.hash
)
.fetch_one(db) .fetch_one(db)
.await?; .await?;
Status::Working { Status::Working {
link: LinkRunShort::new( link: LinkRunShort::new(base, run.id.clone(), &run.hash, &message),
base, since: util::format_time(run.start.0),
unfinished.run.id.clone(),
&unfinished.run.hash,
&message,
),
since: util::format_time(unfinished.run.start.0),
} }
} }
}) })

View file

@ -67,7 +67,7 @@ impl Workers {
.values() .values()
.filter_map(|info| match &info.status { .filter_map(|info| match &info.status {
WorkerStatus::Idle | WorkerStatus::Busy => None, WorkerStatus::Idle | WorkerStatus::Busy => None,
WorkerStatus::Working(unfinished) => Some(&unfinished.run.hash), WorkerStatus::Working(run) => Some(&run.hash),
}) })
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
@ -84,7 +84,10 @@ impl Workers {
// Reserve work so other workers don't choose it // Reserve work so other workers don't choose it
if let Some(info) = self.workers.get_mut(name) { if let Some(info) = self.workers.get_mut(name) {
info.status = WorkerStatus::Working(UnfinishedRun { info.status = WorkerStatus::Working(UnfinishedRun {
run: run.clone(), id: run.id.clone(),
hash: run.hash.clone(),
bench_method: run.bench_method.to_string(),
start: run.start,
last_output: vec![], last_output: vec![],
}); });
} }
@ -95,10 +98,10 @@ impl Workers {
pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool { pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool {
// A worker should abort work if... // A worker should abort work if...
let Some(info) = self.workers.get(name) else { return false; }; let Some(info) = self.workers.get(name) else { return false; };
let WorkerStatus::Working (unfinished) = &info.status else { return false; }; let WorkerStatus::Working (run) = &info.status else { return false; };
// The commit isn't in the queue // The commit isn't in the queue
if !queue.contains(&unfinished.run.hash) { if !queue.contains(&run.hash) {
return true; return true;
} }
@ -107,9 +110,7 @@ impl Workers {
.workers .workers
.iter() .iter()
.filter_map(|(name, info)| match &info.status { .filter_map(|(name, info)| match &info.status {
WorkerStatus::Working(u) if u.run.hash == unfinished.run.hash => { WorkerStatus::Working(u) if u.hash == run.hash => Some((name, u.start)),
Some((name, u.run.start))
}
_ => None, _ => None,
}) })
.max_by_key(|(_, start)| start.0) .max_by_key(|(_, start)| start.0)

View file

@ -1,6 +1,6 @@
//! Data structures modelling the communication between server and worker. //! Data structures modelling the communication between server and worker.
use std::collections::HashMap; use std::{collections::HashMap, fmt};
use serde::{de, Deserialize, Serialize}; use serde::{de, Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr}; use serde_repr::{Deserialize_repr, Serialize_repr};
@ -72,6 +72,15 @@ pub enum BenchMethod {
Repo { hash: String }, Repo { hash: String },
} }
impl fmt::Display for BenchMethod {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BenchMethod::Internal => write!(f, "internal"),
BenchMethod::Repo { hash } => write!(f, "bench repo, hash {hash}"),
}
}
}
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct Run { pub struct Run {
pub id: String, pub id: String,
@ -82,8 +91,10 @@ pub struct Run {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct UnfinishedRun { pub struct UnfinishedRun {
#[serde(flatten)] pub id: String,
pub run: Run, pub hash: String,
pub bench_method: String,
pub start: Rfc3339Time,
#[serde(default)] #[serde(default)]
pub last_output: Vec<(Source, String)>, pub last_output: Vec<(Source, String)>,
@ -91,8 +102,10 @@ pub struct UnfinishedRun {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct FinishedRun { pub struct FinishedRun {
#[serde(flatten)] pub id: String,
pub run: Run, pub hash: String,
pub bench_method: String,
pub start: Rfc3339Time,
/// Override the server's end time. /// Override the server's end time.
/// ///

View file

@ -48,7 +48,6 @@ impl RunInProgress {
} }
pub fn as_unfinished_run(&self) -> UnfinishedRun { pub fn as_unfinished_run(&self) -> UnfinishedRun {
let run = self.run.clone();
let last_output = self let last_output = self
.output .output
.lock() .lock()
@ -59,7 +58,13 @@ impl RunInProgress {
.rev() .rev()
.cloned() .cloned()
.collect(); .collect();
UnfinishedRun { run, last_output } UnfinishedRun {
id: self.run.id.clone(),
hash: self.run.hash.clone(),
bench_method: self.run.bench_method.to_string(),
start: self.run.start,
last_output,
}
} }
pub fn log_stdout(&self, line: String) { pub fn log_stdout(&self, line: String) {
@ -79,7 +84,7 @@ impl RunInProgress {
} }
.await; .await;
let finished = match result { let run = match result {
Ok(outcome) => outcome, Ok(outcome) => outcome,
Err(e) => { Err(e) => {
error!("Error during run:\n{e:?}"); error!("Error during run:\n{e:?}");
@ -96,11 +101,14 @@ impl RunInProgress {
std::mem::swap(&mut output, &mut *self.output.lock().unwrap()); std::mem::swap(&mut output, &mut *self.output.lock().unwrap());
Some(FinishedRun { Some(FinishedRun {
run: self.run.clone(), id: self.run.id.clone(),
hash: self.run.hash.clone(),
bench_method: self.run.bench_method.to_string(),
start: self.run.start,
end: None, end: None,
exit_code: finished.exit_code, exit_code: run.exit_code,
output, output,
measurements: finished.measurements, measurements: run.measurements,
}) })
} }
} }