From d15d6588f7c496ef528543d4b925037cb5b903d6 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 12 Aug 2023 13:19:16 +0200 Subject: [PATCH] Improve and fix runner API - The server now also signals abort_work if the commit is no longer in the queue. - The server now immediately reserves the work it gives to the worker, so other idle workers won't get the same work. - The server now chooses a run id, not the worker. The worker can still submit work under arbitrary run ids when no id is known, for example when importing runs from another database. --- src/server/web/api.rs | 38 ++++++++-------------- src/server/workers.rs | 75 ++++++++++++++++++++++++++++--------------- src/shared.rs | 2 ++ src/worker/run.rs | 4 +-- src/worker/server.rs | 2 +- 5 files changed, 68 insertions(+), 53 deletions(-) diff --git a/src/server/web/api.rs b/src/server/web/api.rs index e8eec6b..7d285b1 100644 --- a/src/server/web/api.rs +++ b/src/server/web/api.rs @@ -24,7 +24,7 @@ use crate::{ workers::{WorkerInfo, Workers}, BenchRepo, Repo, Server, }, - shared::{BenchMethod, FinishedRun, ServerResponse, Work, WorkerRequest}, + shared::{BenchMethod, FinishedRun, ServerResponse, WorkerRequest}, somehow, }; @@ -107,26 +107,6 @@ async fn save_work(run: FinishedRun, db: SqlitePool) -> somehow::Result<()> { Ok(()) } -fn prepare_work( - work: Option<&str>, - bench_repo: Option, -) -> somehow::Result> { - Ok(if let Some(hash) = work { - let bench = match bench_repo { - Some(bench_repo) => BenchMethod::Repo { - hash: bench_repo.0.to_thread_local().head_id()?.to_string(), - }, - None => BenchMethod::Internal, - }; - Some(Work { - hash: hash.to_string(), - bench, - }) - } else { - None - }) -} - async fn post_status( State(config): State<&'static Config>, State(db): State, @@ -140,6 +120,7 @@ async fn post_status( Err(response) => return Ok(response), }; + // Fetch queue let queue = sqlx::query_scalar!( "\ SELECT hash FROM queue \ @@ -149,6 +130,15 @@ async fn post_status( .fetch_all(&db) .await?; + // Fetch bench method + let bench_method = match bench_repo { + Some(bench_repo) => BenchMethod::Repo { + hash: bench_repo.0.to_thread_local().head_id()?.to_string(), + }, + None => BenchMethod::Internal, + }; + + // Update internal state let (work, abort_work) = { let mut guard = workers.lock().unwrap(); guard.clean(); @@ -160,10 +150,10 @@ async fn post_status( WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status), ); let work = match request.request_work { - true => guard.find_free_work(&queue), + true => guard.find_work(&name, &queue, bench_method), false => None, }; - let abort_work = guard.should_abort_work(&name); + let abort_work = guard.should_abort_work(&name, &queue); (work, abort_work) }; @@ -171,8 +161,6 @@ async fn post_status( save_work(run, db).await?; } - let work = prepare_work(work, bench_repo)?; - // TODO Reserve this work debug!("Received status update from {name}"); Ok(Json(ServerResponse { work, abort_work }).into_response()) } diff --git a/src/server/workers.rs b/src/server/workers.rs index e92debd..b3410e0 100644 --- a/src/server/workers.rs +++ b/src/server/workers.rs @@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet}; use time::OffsetDateTime; -use crate::{config::Config, shared::WorkerStatus}; +use crate::{ + config::Config, + id, + shared::{BenchMethod, UnfinishedRun, Work, WorkerStatus}, +}; #[derive(Clone)] pub struct WorkerInfo { @@ -50,39 +54,60 @@ impl Workers { self.workers.insert(name, info); } - fn oldest_working_on(&self, hash: &str) -> Option<&str> { - self.workers - .iter() - .filter_map(|(name, info)| match &info.status { - WorkerStatus::Working(run) if run.hash == hash => Some((name, run.start)), - _ => None, - }) - .max_by_key(|(_, since)| *since) - .map(|(name, _)| name as &str) - } - - pub fn should_abort_work(&self, name: &str) -> bool { - // TODO Abort if not in queue - let Some(info) = self.workers.get(name) else { return false; }; - let WorkerStatus::Working ( run) = &info.status else { return false; }; - let Some(oldest) = self.oldest_working_on(&run.hash) else { return false; }; - name != oldest - } - - pub fn find_free_work<'a>(&self, hashes: &'a [String]) -> Option<&'a str> { + /// Find and reserve work for a worker. + pub fn find_work(&mut self, name: &str, queue: &[String], bench: BenchMethod) -> Option { let covered = self .workers .values() .filter_map(|info| match &info.status { + WorkerStatus::Idle | WorkerStatus::Busy => None, WorkerStatus::Working(run) => Some(&run.hash), - _ => None, }) .collect::>(); - hashes + // Find work not already covered by another worker + let hash = queue.iter().find(|hash| !covered.contains(hash))?.clone(); + let id = id::random_run_id(); + let work = Work { id, hash, bench }; + + // Reserve work so other workers don't choose it + if let Some(info) = self.workers.get_mut(name) { + info.status = WorkerStatus::Working(UnfinishedRun { + id: work.id.clone(), + hash: work.hash.clone(), + start: OffsetDateTime::now_utc(), + last_output: vec![], + }); + } + + Some(work) + } + + pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool { + // A runner should abort work if... + let Some(info) = self.workers.get(name) else { return false; }; + let WorkerStatus::Working (run) = &info.status else { return false; }; + + // The commit isn't in the queue + if !queue.contains(&run.hash) { + return true; + } + + // Another runner has been working on the same commit for longer + let oldest_working_on_commit = self + .workers .iter() - .find(|hash| !covered.contains(hash)) - .map(|hash| hash as &str) + .filter_map(|(name, info)| match &info.status { + WorkerStatus::Working(r) if r.hash == run.hash => Some((name, r.start)), + _ => None, + }) + .max_by_key(|(_, start)| *start) + .map(|(name, _)| name as &str); + if oldest_working_on_commit != Some(name) { + return true; + } + + false } pub fn get(&self, name: &str) -> Option { diff --git a/src/shared.rs b/src/shared.rs index 52535a1..1d4d857 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -108,6 +108,8 @@ pub enum BenchMethod { #[derive(Clone, Serialize, Deserialize)] pub struct Work { + /// Id of the run. + pub id: String, /// Hash of commit to benchmark. pub hash: String, /// How to benchmark the commit. diff --git a/src/worker/run.rs b/src/worker/run.rs index a2f71ca..ec4375f 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -44,9 +44,9 @@ pub struct Run { } impl Run { - pub fn new(hash: String) -> Self { + pub fn new(id: String, hash: String) -> Self { Self { - id: id::random_run_id(), + id, hash, start: OffsetDateTime::now_utc(), output: vec![], diff --git a/src/worker/server.rs b/src/worker/server.rs index 064c3cd..764f9a1 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -161,7 +161,7 @@ impl Server { assert!(!unfinished); assert!(self.run.is_none()); - let run = Arc::new(Mutex::new(Run::new(work.hash))); + let run = Arc::new(Mutex::new(Run::new(work.id, work.hash))); let (abort_tx, abort_rx) = mpsc::unbounded_channel(); self.run = Some((run.clone(), abort_tx));