diff --git a/src/server/runners.rs b/src/server/runners.rs index 6b1857e..c004d3d 100644 --- a/src/server/runners.rs +++ b/src/server/runners.rs @@ -55,7 +55,7 @@ impl Runners { self.runners .iter() .filter_map(|(name, info)| match &info.status { - RunnerStatus::Working { hash: h, since, .. } if h == hash => Some((name, *since)), + RunnerStatus::Working(run) if run.hash == hash => Some((name, run.start)), _ => None, }) .max_by_key(|(_, since)| *since) @@ -64,8 +64,8 @@ impl Runners { pub fn should_abort_work(&self, name: &str) -> bool { let Some(info) = self.runners.get(name) else { return false; }; - let RunnerStatus::Working { hash, .. } = &info.status else { return false; }; - let Some(oldest) = self.oldest_working_on(hash) else { return false; }; + let RunnerStatus::Working ( run) = &info.status else { return false; }; + let Some(oldest) = self.oldest_working_on(&run.hash) else { return false; }; name != oldest } @@ -74,7 +74,7 @@ impl Runners { .runners .values() .filter_map(|info| match &info.status { - RunnerStatus::Working { hash, .. } => Some(hash), + RunnerStatus::Working(run) => Some(&run.hash), _ => None, }) .collect::>(); diff --git a/src/server/web/api.rs b/src/server/web/api.rs index 918ff82..b23c878 100644 --- a/src/server/web/api.rs +++ b/src/server/web/api.rs @@ -67,7 +67,7 @@ async fn post_status( // Find new work let work = if let Some(hash) = work { let bench = match bench_repo { - Some(bench_repo) => BenchMethod::BenchRepo { + Some(bench_repo) => BenchMethod::Repo { hash: bench_repo.0.to_thread_local().head_id()?.to_string(), }, None => BenchMethod::Internal, diff --git a/src/server/web/queue.rs b/src/server/web/queue.rs index cef6b06..520f789 100644 --- a/src/server/web/queue.rs +++ b/src/server/web/queue.rs @@ -64,12 +64,12 @@ async fn get_runners( let status = match &info.status { RunnerStatus::Idle => Status::Idle, RunnerStatus::Busy => Status::Busy, - RunnerStatus::Working { id, hash, .. } => { + RunnerStatus::Working(run) => { let message = - sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", hash) + sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash) .fetch_one(db) .await?; - Status::Working(RunLink::new(base, id.clone(), hash, &message)) + Status::Working(RunLink::new(base, run.id.clone(), &run.hash, &message)) } }; @@ -89,9 +89,9 @@ async fn get_queue( // Group runners by commit hash let mut runners_by_commit: HashMap> = HashMap::new(); for (name, info) in runners { - if let RunnerStatus::Working { hash, .. } = &info.status { + if let RunnerStatus::Working(run) = &info.status { runners_by_commit - .entry(hash.clone()) + .entry(run.hash.clone()) .or_default() .push(RunnerLink::new(base, name.clone())); } diff --git a/src/shared.rs b/src/shared.rs index 35b4e04..410c092 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -6,6 +6,10 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use time::OffsetDateTime; +fn is_false(b: &bool) -> bool { + !b +} + #[derive(Clone, Serialize_repr, Deserialize_repr)] #[repr(i8)] pub enum Direction { @@ -17,27 +21,42 @@ pub enum Direction { #[derive(Clone, Serialize, Deserialize)] pub struct Measurement { pub value: f64, + #[serde(skip_serializing_if = "Option::is_none")] pub stddev: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub unit: Option, - pub direction: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub direction: Option, +} + +#[derive(Clone, Serialize_repr, Deserialize_repr)] +#[repr(u8)] +pub enum Source { + // Stdin would be fd 0 + Stdout = 1, + Stderr = 2, } #[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "type")] -pub enum Line { - Stdout(String), - Stderr(String), +pub struct UnfinishedRun { + pub id: String, + pub hash: String, + pub start: OffsetDateTime, + #[serde(default)] + pub last_output: Vec<(Source, String)>, } #[derive(Clone, Serialize, Deserialize)] pub struct FinishedRun { pub id: String, + pub hash: String, pub start: OffsetDateTime, pub end: OffsetDateTime, - pub output: Vec, + #[serde(default)] pub exit_code: i32, pub measurements: HashMap, + #[serde(default)] + pub output: Vec<(Source, String)>, } #[derive(Clone, Serialize, Deserialize)] @@ -49,12 +68,7 @@ pub enum RunnerStatus { /// The runner is performing work for another server. Busy, /// The runner is performing work for the current server. - Working { - id: String, - hash: String, - since: OffsetDateTime, - last_lines: Vec, - }, + Working(UnfinishedRun), } #[derive(Clone, Serialize, Deserialize)] @@ -74,10 +88,11 @@ pub struct RunnerRequest { /// /// If the server has a commit available, it should respond with a non-null /// [`Response::work`]. - #[serde(default)] + #[serde(default, skip_serializing_if = "is_false")] pub request_work: bool, /// The runner has finished a run and wants to submit the results. + #[serde(skip_serializing_if = "Option::is_none")] pub submit_work: Option, } @@ -88,7 +103,7 @@ pub enum BenchMethod { /// Use internal (deterministic) benchmarking code. Internal, /// Use a commit from a bench repo. - BenchRepo { hash: String }, + Repo { hash: String }, } #[derive(Clone, Serialize, Deserialize)] @@ -107,6 +122,7 @@ pub struct ServerResponse { /// the next update request sent by the runner, the server will consider the /// runner as preparing to work on the commit, and will not give out the /// same commit to other runners. + #[serde(skip_serializing_if = "Option::is_none")] pub work: Option, /// The runner should abort the current run. @@ -114,5 +130,6 @@ pub struct ServerResponse { /// The server may send this because it detected the runner is benchmarking /// the same commit as another runner and has broken the tie in favor of the /// other runner. The runner may continue the run despite this flag. + #[serde(default, skip_serializing_if = "is_false")] pub abort_work: bool, }