diff --git a/src/config.rs b/src/config.rs index 0f2ef71..c6e9787 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,10 @@ mod default { pub fn worker_ping_delay() -> Duration { Duration::from_secs(10) } + + pub fn worker_batch_duration() -> Duration { + Duration::from_secs(60 * 10) + } } #[derive(Debug, Deserialize)] @@ -98,6 +102,9 @@ struct Worker { #[serde(default = "default::worker_ping_delay", with = "humantime_serde")] ping_delay: Duration, + #[serde(default = "default::worker_batch_duration", with = "humantime_serde")] + batch_duration: Duration, + servers: HashMap, } @@ -106,6 +113,7 @@ impl Default for Worker { Self { name: None, ping_delay: default::worker_ping_delay(), + batch_duration: default::worker_batch_duration(), servers: HashMap::new(), } } @@ -216,6 +224,7 @@ pub struct Config { pub repo_update_delay: Duration, pub worker_name: String, pub worker_ping_delay: Duration, + pub worker_batch_duration: Duration, pub worker_servers: HashMap, } @@ -252,6 +261,7 @@ impl Config { repo_update_delay: config_file.repo.update_delay, worker_name, worker_ping_delay: config_file.worker.ping_delay, + worker_batch_duration: config_file.worker.batch_duration, worker_servers, }) } diff --git a/src/id.rs b/src/id.rs index 754aaad..bb4ebae 100644 --- a/src/id.rs +++ b/src/id.rs @@ -9,6 +9,10 @@ fn random_id(prefix: &str, length: usize) -> String { .collect() } +pub fn random_run_id() -> String { + random_id("r-", 30) +} + pub fn random_worker_token() -> String { random_id("t-", 30) } diff --git a/src/worker.rs b/src/worker.rs index 5ce531f..2c3afc9 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,4 +1,5 @@ mod coordinator; +mod run; mod server; mod tree; diff --git a/src/worker/coordinator.rs b/src/worker/coordinator.rs index 58f9981..cd0165b 100644 --- a/src/worker/coordinator.rs +++ b/src/worker/coordinator.rs @@ -1,5 +1,8 @@ //! Coordinate performing runs across servers. +use std::time::Duration; + +use time::OffsetDateTime; use tokio::sync::mpsc; struct Server { @@ -9,32 +12,48 @@ struct Server { pub struct Coordinator { servers: Vec, - current: usize, + active: usize, + active_since: OffsetDateTime, + busy: bool, } impl Coordinator { pub fn new() -> Self { Self { servers: vec![], - current: 0, + active: 0, + active_since: OffsetDateTime::now_utc(), + busy: false, } } pub fn register(&mut self, name: String, poke: mpsc::UnboundedSender<()>) { + // TODO Assert that no duplicate names exist? self.servers.push(Server { name, poke }); } - pub fn active(&self, name: &str) -> bool { - if let Some(current) = self.servers.get(self.current) { - name == current.name - } else { - false + pub fn active(&self, name: &str) -> ActiveInfo { + let active_server = self.servers.get(self.active); + let active = active_server.filter(|s| s.name == name).is_some(); + ActiveInfo { + active, + active_since: self.active_since, + busy: self.busy, } } - pub fn next(&mut self, name: &str) { + pub fn look_busy(&mut self, name: &str) { // Check just to prevent weird shenanigans - if !self.active(name) { + if !self.active(name).active { + return; + } + + self.busy = true; + } + + pub fn move_to_next_server(&mut self, name: &str) { + // Check just to prevent weird shenanigans + if !self.active(name).active { return; } @@ -42,8 +61,10 @@ impl Coordinator { // the previous check assert!(!self.servers.is_empty()); - self.current += 1; - self.current %= self.servers.len(); + self.active += 1; + self.active %= self.servers.len(); + self.active_since = OffsetDateTime::now_utc(); + self.busy = false; // When the worker seeks work and a queue is idle, the next server // should be queried immediately. Otherwise, we'd introduce lots of @@ -61,8 +82,23 @@ impl Coordinator { // will send two requests back-to-back: The first because their ping // timeout ran out, and the second because they were poked. So far, I // haven't been able to think of an elegant solution for this. - if self.current > 0 { - let _ = self.servers[self.current].poke.send(()); + if self.active > 0 { + let _ = self.servers[self.active].poke.send(()); } } } + +#[derive(Clone, Copy)] +pub struct ActiveInfo { + pub active: bool, + pub active_since: OffsetDateTime, + pub busy: bool, +} + +impl ActiveInfo { + pub fn in_batch(&self, batch_duration: Duration) -> bool { + let batch_end = self.active_since + batch_duration; + let now = OffsetDateTime::now_utc(); + now <= batch_end + } +} diff --git a/src/worker/run.rs b/src/worker/run.rs new file mode 100644 index 0000000..d27191b --- /dev/null +++ b/src/worker/run.rs @@ -0,0 +1,93 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use time::OffsetDateTime; +use tokio::sync::mpsc; + +use crate::{ + id, + shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun}, +}; + +const LIVE_SCROLLBACK: usize = 50; + +pub enum FullRunStatus { + Unfinished(UnfinishedRun), + Finished(FinishedRun), + Aborted, +} + +#[derive(Clone)] +pub enum RunStatus { + Unfinished, + Finished { + end: OffsetDateTime, + exit_code: i32, + measurements: HashMap, + }, + Aborted, +} + +#[derive(Clone)] +pub struct Run { + id: String, + hash: String, + start: OffsetDateTime, + output: Vec<(Source, String)>, + status: RunStatus, +} + +impl Run { + pub fn new(hash: String) -> Self { + Self { + id: id::random_run_id(), + hash, + start: OffsetDateTime::now_utc(), + output: vec![], + status: RunStatus::Unfinished, + } + } + + pub fn into_full_status(self) -> FullRunStatus { + match self.status { + RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun { + id: self.id, + hash: self.hash, + start: self.start, + last_output: self + .output + .into_iter() + .rev() + .take(LIVE_SCROLLBACK) + .rev() + .collect(), + }), + + RunStatus::Finished { + end, + exit_code, + measurements, + } => FullRunStatus::Finished(FinishedRun { + id: self.id, + hash: self.hash, + start: self.start, + end, + exit_code, + measurements, + output: self.output, + }), + + RunStatus::Aborted => FullRunStatus::Aborted, + } + } +} + +pub async fn run( + run: Arc>, + abort_rx: mpsc::UnboundedReceiver<()>, + bench_method: BenchMethod, +) { + // TODO Implement +} diff --git a/src/worker/server.rs b/src/worker/server.rs index a1f78a9..064c3cd 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -7,11 +7,15 @@ use tracing::{debug, info_span, warn, Instrument}; use crate::{ config::{Config, WorkerServerConfig}, id, - shared::{WorkerRequest, WorkerStatus}, + shared::{FinishedRun, ServerResponse, WorkerRequest, WorkerStatus}, somehow, + worker::run::{self, FullRunStatus}, }; -use super::coordinator::Coordinator; +use super::{ + coordinator::{ActiveInfo, Coordinator}, + run::Run, +}; pub struct Server { name: String, @@ -20,6 +24,9 @@ pub struct Server { coordinator: Arc>, client: Client, secret: String, + + // TODO Cache bench dir + run: Option<(Arc>, mpsc::UnboundedSender<()>)>, } impl Server { @@ -36,16 +43,19 @@ impl Server { coordinator, client: Client::new(), secret: id::random_worker_secret(), + run: None, } } pub async fn run(&mut self) { + // Register with coordinator let (poke_tx, mut poke_rx) = mpsc::unbounded_channel(); self.coordinator .lock() .unwrap() .register(self.name.clone(), poke_tx.clone()); + // Main loop let name = self.name.clone(); async { loop { @@ -54,37 +64,147 @@ impl Server { Err(e) => warn!("Error talking to server:\n{e:?}"), } - // Wait for poke or until the ping delay elapses. If we get - // poked while pinging the server, this will not wait and we'll - // immediately do another ping. - let _ = tokio::time::timeout(self.config.worker_ping_delay, poke_rx.recv()).await; - - // Empty queue in case we were poked more than once. This can - // happen for example if we get poked multiple times while - // pinging the server. - while poke_rx.try_recv().is_ok() {} + self.wait_until_next_ping(&mut poke_rx).await; } } .instrument(info_span!("worker", name)) .await; } - async fn ping(&self) -> somehow::Result<()> { - debug!("Pinging"); + async fn wait_until_next_ping(&self, poke_rx: &mut mpsc::UnboundedReceiver<()>) { + // Wait for poke or until the ping delay elapses. If we get poked while + // pinging the server, this will not wait and we'll immediately do + // another ping. + let _ = tokio::time::timeout(self.config.worker_ping_delay, poke_rx.recv()).await; + + // Empty queue in case we were poked more than once. This can happen for + // example if we get poked multiple times while pinging the server. + while poke_rx.try_recv().is_ok() {} + } + + async fn ping(&mut self) -> somehow::Result<()> { + debug!("Pinging server"); + + let info = self.coordinator.lock().unwrap().active(&self.name); + if info.active { + self.ping_active(info).await?; + } else { + self.ping_inactive(info).await?; + } + + Ok(()) + } + + async fn ping_inactive(&self, info: ActiveInfo) -> somehow::Result<()> { + assert!(self.run.is_none()); + + let status = match info.busy { + true => WorkerStatus::Busy, + false => WorkerStatus::Idle, + }; + self.request(status, false, None).await?; + Ok(()) + } + + async fn ping_active(&mut self, info: ActiveInfo) -> somehow::Result<()> { + let run = self + .run + .as_ref() + .map(|(r, _)| r.lock().unwrap().clone().into_full_status()) + .unwrap_or(FullRunStatus::Aborted); + + let unfinished = matches!(run, FullRunStatus::Unfinished(_)); + let aborted = matches!(run, FullRunStatus::Aborted); + let in_batch = info.in_batch(self.config.worker_batch_duration); + + let (status, submit_work) = match run { + FullRunStatus::Unfinished(run) => (WorkerStatus::Working(run), None), + FullRunStatus::Finished(run) => (WorkerStatus::Idle, Some(run)), + FullRunStatus::Aborted => (WorkerStatus::Idle, None), + }; + let request_work = in_batch && !unfinished; + let response = self.request(status, request_work, submit_work).await; + + if response.is_err() && aborted { + // We have nothing important going on, let's defer to the next + // server and hope this one will respond again soon. + self.coordinator + .lock() + .unwrap() + .move_to_next_server(&self.name); + + // Return explicitly to ensure we don't continue to the rest of the + // function in the false belief that we're active. Oh, and don't + // swallow the error. + response?; + return Ok(()); + } + + let response = response?; + + // Clean up self.run if we no longer need it + if !unfinished { + // We can get rid of finished runs since we just successfully sent + // the server the results. + self.run = None; + } + + // Abort run if server says so + if response.abort_work { + if let Some((_, abort_tx)) = &self.run { + let _ = abort_tx.send(()); + } + } + + // Start work (but only if we requested it) + if let Some(work) = response.work.filter(|_| request_work) { + assert!(!unfinished); + assert!(self.run.is_none()); + + let run = Arc::new(Mutex::new(Run::new(work.hash))); + let (abort_tx, abort_rx) = mpsc::unbounded_channel(); + + self.run = Some((run.clone(), abort_tx)); + self.coordinator.lock().unwrap().look_busy(&self.name); + tokio::spawn(run::run(run, abort_rx, work.bench)); + } + + // Finally, advance to the next server if it makes sense to do so + if self.run.is_none() { + self.coordinator + .lock() + .unwrap() + .move_to_next_server(&self.name); + } + + Ok(()) + } + + async fn request( + &self, + status: WorkerStatus, + request_work: bool, + submit_work: Option, + ) -> somehow::Result { + let url = format!("{}api/worker/status", self.server_config.url); let request = WorkerRequest { info: None, secret: self.secret.clone(), - status: WorkerStatus::Idle, - request_work: false, - submit_work: None, + status, + request_work, + submit_work, }; - let url = format!("{}api/worker/status", self.server_config.url); - self.client + + let response = self + .client .post(url) .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) .json(&request) .send() + .await? + .json::() .await?; - Ok(()) + + Ok(response) } }