Overhaul runner protocol

The JSON should now be nicer to work with.
This commit is contained in:
Joscha 2023-08-11 00:24:00 +02:00
parent b16b3a668e
commit 7911a67906
4 changed files with 42 additions and 25 deletions

View file

@ -55,7 +55,7 @@ impl Runners {
self.runners self.runners
.iter() .iter()
.filter_map(|(name, info)| match &info.status { .filter_map(|(name, info)| match &info.status {
RunnerStatus::Working { hash: h, since, .. } if h == hash => Some((name, *since)), RunnerStatus::Working(run) if run.hash == hash => Some((name, run.start)),
_ => None, _ => None,
}) })
.max_by_key(|(_, since)| *since) .max_by_key(|(_, since)| *since)
@ -64,8 +64,8 @@ impl Runners {
pub fn should_abort_work(&self, name: &str) -> bool { pub fn should_abort_work(&self, name: &str) -> bool {
let Some(info) = self.runners.get(name) else { return false; }; let Some(info) = self.runners.get(name) else { return false; };
let RunnerStatus::Working { hash, .. } = &info.status else { return false; }; let RunnerStatus::Working ( run) = &info.status else { return false; };
let Some(oldest) = self.oldest_working_on(hash) else { return false; }; let Some(oldest) = self.oldest_working_on(&run.hash) else { return false; };
name != oldest name != oldest
} }
@ -74,7 +74,7 @@ impl Runners {
.runners .runners
.values() .values()
.filter_map(|info| match &info.status { .filter_map(|info| match &info.status {
RunnerStatus::Working { hash, .. } => Some(hash), RunnerStatus::Working(run) => Some(&run.hash),
_ => None, _ => None,
}) })
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();

View file

@ -67,7 +67,7 @@ async fn post_status(
// Find new work // Find new work
let work = if let Some(hash) = work { let work = if let Some(hash) = work {
let bench = match bench_repo { let bench = match bench_repo {
Some(bench_repo) => BenchMethod::BenchRepo { Some(bench_repo) => BenchMethod::Repo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(), hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
}, },
None => BenchMethod::Internal, None => BenchMethod::Internal,

View file

@ -64,12 +64,12 @@ async fn get_runners(
let status = match &info.status { let status = match &info.status {
RunnerStatus::Idle => Status::Idle, RunnerStatus::Idle => Status::Idle,
RunnerStatus::Busy => Status::Busy, RunnerStatus::Busy => Status::Busy,
RunnerStatus::Working { id, hash, .. } => { RunnerStatus::Working(run) => {
let message = let message =
sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", hash) sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash)
.fetch_one(db) .fetch_one(db)
.await?; .await?;
Status::Working(RunLink::new(base, id.clone(), hash, &message)) Status::Working(RunLink::new(base, run.id.clone(), &run.hash, &message))
} }
}; };
@ -89,9 +89,9 @@ async fn get_queue(
// Group runners by commit hash // Group runners by commit hash
let mut runners_by_commit: HashMap<String, Vec<RunnerLink>> = HashMap::new(); let mut runners_by_commit: HashMap<String, Vec<RunnerLink>> = HashMap::new();
for (name, info) in runners { for (name, info) in runners {
if let RunnerStatus::Working { hash, .. } = &info.status { if let RunnerStatus::Working(run) = &info.status {
runners_by_commit runners_by_commit
.entry(hash.clone()) .entry(run.hash.clone())
.or_default() .or_default()
.push(RunnerLink::new(base, name.clone())); .push(RunnerLink::new(base, name.clone()));
} }

View file

@ -6,6 +6,10 @@ use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr}; use serde_repr::{Deserialize_repr, Serialize_repr};
use time::OffsetDateTime; use time::OffsetDateTime;
fn is_false(b: &bool) -> bool {
!b
}
#[derive(Clone, Serialize_repr, Deserialize_repr)] #[derive(Clone, Serialize_repr, Deserialize_repr)]
#[repr(i8)] #[repr(i8)]
pub enum Direction { pub enum Direction {
@ -17,27 +21,42 @@ pub enum Direction {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct Measurement { pub struct Measurement {
pub value: f64, pub value: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub stddev: Option<f64>, pub stddev: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub unit: Option<String>, pub unit: Option<String>,
pub direction: Option<i8>, #[serde(skip_serializing_if = "Option::is_none")]
pub direction: Option<Direction>,
}
#[derive(Clone, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
pub enum Source {
// Stdin would be fd 0
Stdout = 1,
Stderr = 2,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")] pub struct UnfinishedRun {
#[serde(tag = "type")] pub id: String,
pub enum Line { pub hash: String,
Stdout(String), pub start: OffsetDateTime,
Stderr(String), #[serde(default)]
pub last_output: Vec<(Source, String)>,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct FinishedRun { pub struct FinishedRun {
pub id: String, pub id: String,
pub hash: String,
pub start: OffsetDateTime, pub start: OffsetDateTime,
pub end: OffsetDateTime, pub end: OffsetDateTime,
pub output: Vec<Line>, #[serde(default)]
pub exit_code: i32, pub exit_code: i32,
pub measurements: HashMap<String, Measurement>, pub measurements: HashMap<String, Measurement>,
#[serde(default)]
pub output: Vec<(Source, String)>,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -49,12 +68,7 @@ pub enum RunnerStatus {
/// The runner is performing work for another server. /// The runner is performing work for another server.
Busy, Busy,
/// The runner is performing work for the current server. /// The runner is performing work for the current server.
Working { Working(UnfinishedRun),
id: String,
hash: String,
since: OffsetDateTime,
last_lines: Vec<Line>,
},
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -74,10 +88,11 @@ pub struct RunnerRequest {
/// ///
/// If the server has a commit available, it should respond with a non-null /// If the server has a commit available, it should respond with a non-null
/// [`Response::work`]. /// [`Response::work`].
#[serde(default)] #[serde(default, skip_serializing_if = "is_false")]
pub request_work: bool, pub request_work: bool,
/// The runner has finished a run and wants to submit the results. /// The runner has finished a run and wants to submit the results.
#[serde(skip_serializing_if = "Option::is_none")]
pub submit_work: Option<FinishedRun>, pub submit_work: Option<FinishedRun>,
} }
@ -88,7 +103,7 @@ pub enum BenchMethod {
/// Use internal (deterministic) benchmarking code. /// Use internal (deterministic) benchmarking code.
Internal, Internal,
/// Use a commit from a bench repo. /// Use a commit from a bench repo.
BenchRepo { hash: String }, Repo { hash: String },
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -107,6 +122,7 @@ pub struct ServerResponse {
/// the next update request sent by the runner, the server will consider the /// the next update request sent by the runner, the server will consider the
/// runner as preparing to work on the commit, and will not give out the /// runner as preparing to work on the commit, and will not give out the
/// same commit to other runners. /// same commit to other runners.
#[serde(skip_serializing_if = "Option::is_none")]
pub work: Option<Work>, pub work: Option<Work>,
/// The runner should abort the current run. /// The runner should abort the current run.
@ -114,5 +130,6 @@ pub struct ServerResponse {
/// The server may send this because it detected the runner is benchmarking /// The server may send this because it detected the runner is benchmarking
/// the same commit as another runner and has broken the tie in favor of the /// the same commit as another runner and has broken the tie in favor of the
/// other runner. The runner may continue the run despite this flag. /// other runner. The runner may continue the run despite this flag.
#[serde(default, skip_serializing_if = "is_false")]
pub abort_work: bool, pub abort_work: bool,
} }