Improve and fix runner API

- The server now also signals abort_work if the commit is no longer in
  the queue.
- The server now immediately reserves the work it gives to the worker,
  so other idle workers won't get the same work.
- The server now chooses a run id, not the worker. The worker can still
  submit work under arbitrary run ids when no id is known, for example
  when importing runs from another database.
This commit is contained in:
Joscha 2023-08-12 13:19:16 +02:00
parent dd45be98a5
commit d15d6588f7
5 changed files with 68 additions and 53 deletions

View file

@ -24,7 +24,7 @@ use crate::{
workers::{WorkerInfo, Workers},
BenchRepo, Repo, Server,
},
shared::{BenchMethod, FinishedRun, ServerResponse, Work, WorkerRequest},
shared::{BenchMethod, FinishedRun, ServerResponse, WorkerRequest},
somehow,
};
@ -107,26 +107,6 @@ async fn save_work(run: FinishedRun, db: SqlitePool) -> somehow::Result<()> {
Ok(())
}
fn prepare_work(
work: Option<&str>,
bench_repo: Option<BenchRepo>,
) -> somehow::Result<Option<Work>> {
Ok(if let Some(hash) = work {
let bench = match bench_repo {
Some(bench_repo) => BenchMethod::Repo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
},
None => BenchMethod::Internal,
};
Some(Work {
hash: hash.to_string(),
bench,
})
} else {
None
})
}
async fn post_status(
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
@ -140,6 +120,7 @@ async fn post_status(
Err(response) => return Ok(response),
};
// Fetch queue
let queue = sqlx::query_scalar!(
"\
SELECT hash FROM queue \
@ -149,6 +130,15 @@ async fn post_status(
.fetch_all(&db)
.await?;
// Fetch bench method
let bench_method = match bench_repo {
Some(bench_repo) => BenchMethod::Repo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
},
None => BenchMethod::Internal,
};
// Update internal state
let (work, abort_work) = {
let mut guard = workers.lock().unwrap();
guard.clean();
@ -160,10 +150,10 @@ async fn post_status(
WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
);
let work = match request.request_work {
true => guard.find_free_work(&queue),
true => guard.find_work(&name, &queue, bench_method),
false => None,
};
let abort_work = guard.should_abort_work(&name);
let abort_work = guard.should_abort_work(&name, &queue);
(work, abort_work)
};
@ -171,8 +161,6 @@ async fn post_status(
save_work(run, db).await?;
}
let work = prepare_work(work, bench_repo)?;
// TODO Reserve this work
debug!("Received status update from {name}");
Ok(Json(ServerResponse { work, abort_work }).into_response())
}

View file

@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet};
use time::OffsetDateTime;
use crate::{config::Config, shared::WorkerStatus};
use crate::{
config::Config,
id,
shared::{BenchMethod, UnfinishedRun, Work, WorkerStatus},
};
#[derive(Clone)]
pub struct WorkerInfo {
@ -50,39 +54,60 @@ impl Workers {
self.workers.insert(name, info);
}
fn oldest_working_on(&self, hash: &str) -> Option<&str> {
self.workers
.iter()
.filter_map(|(name, info)| match &info.status {
WorkerStatus::Working(run) if run.hash == hash => Some((name, run.start)),
_ => None,
})
.max_by_key(|(_, since)| *since)
.map(|(name, _)| name as &str)
}
pub fn should_abort_work(&self, name: &str) -> bool {
// TODO Abort if not in queue
let Some(info) = self.workers.get(name) else { return false; };
let WorkerStatus::Working ( run) = &info.status else { return false; };
let Some(oldest) = self.oldest_working_on(&run.hash) else { return false; };
name != oldest
}
pub fn find_free_work<'a>(&self, hashes: &'a [String]) -> Option<&'a str> {
/// Find and reserve work for a worker.
pub fn find_work(&mut self, name: &str, queue: &[String], bench: BenchMethod) -> Option<Work> {
let covered = self
.workers
.values()
.filter_map(|info| match &info.status {
WorkerStatus::Idle | WorkerStatus::Busy => None,
WorkerStatus::Working(run) => Some(&run.hash),
_ => None,
})
.collect::<HashSet<_>>();
hashes
// Find work not already covered by another worker
let hash = queue.iter().find(|hash| !covered.contains(hash))?.clone();
let id = id::random_run_id();
let work = Work { id, hash, bench };
// Reserve work so other workers don't choose it
if let Some(info) = self.workers.get_mut(name) {
info.status = WorkerStatus::Working(UnfinishedRun {
id: work.id.clone(),
hash: work.hash.clone(),
start: OffsetDateTime::now_utc(),
last_output: vec![],
});
}
Some(work)
}
pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool {
// A runner should abort work if...
let Some(info) = self.workers.get(name) else { return false; };
let WorkerStatus::Working (run) = &info.status else { return false; };
// The commit isn't in the queue
if !queue.contains(&run.hash) {
return true;
}
// Another runner has been working on the same commit for longer
let oldest_working_on_commit = self
.workers
.iter()
.find(|hash| !covered.contains(hash))
.map(|hash| hash as &str)
.filter_map(|(name, info)| match &info.status {
WorkerStatus::Working(r) if r.hash == run.hash => Some((name, r.start)),
_ => None,
})
.max_by_key(|(_, start)| *start)
.map(|(name, _)| name as &str);
if oldest_working_on_commit != Some(name) {
return true;
}
false
}
pub fn get(&self, name: &str) -> Option<WorkerInfo> {

View file

@ -108,6 +108,8 @@ pub enum BenchMethod {
#[derive(Clone, Serialize, Deserialize)]
pub struct Work {
/// Id of the run.
pub id: String,
/// Hash of commit to benchmark.
pub hash: String,
/// How to benchmark the commit.

View file

@ -44,9 +44,9 @@ pub struct Run {
}
impl Run {
pub fn new(hash: String) -> Self {
pub fn new(id: String, hash: String) -> Self {
Self {
id: id::random_run_id(),
id,
hash,
start: OffsetDateTime::now_utc(),
output: vec![],

View file

@ -161,7 +161,7 @@ impl Server {
assert!(!unfinished);
assert!(self.run.is_none());
let run = Arc::new(Mutex::new(Run::new(work.hash)));
let run = Arc::new(Mutex::new(Run::new(work.id, work.hash)));
let (abort_tx, abort_rx) = mpsc::unbounded_channel();
self.run = Some((run.clone(), abort_tx));