From b7c04430054dec608c571bfa37eb963454f30d54 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 12 Aug 2023 21:05:02 +0200 Subject: [PATCH] Ask servers for runs and perform them --- src/worker.rs | 75 +++++++++++++++++++++++++++++++++++++++++--- src/worker/run.rs | 73 +++++++++++++++++++++++++++++++++++++++--- src/worker/server.rs | 48 ++++++++++++++-------------- 3 files changed, 164 insertions(+), 32 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index cad0f1d..259c7eb 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -5,10 +5,18 @@ mod tree; use std::sync::{Arc, Mutex}; use reqwest::Client; +use time::OffsetDateTime; use tokio::sync::Mutex as AsyncMutex; -use tracing::{error, info}; +use tracing::{error, info, warn}; -use crate::{config::Config, id, worker::server::Server}; +use crate::{ + config::Config, + id, + shared::{FinishedRun, Run}, + worker::server::Server, +}; + +use self::run::RunInProgress; pub struct Worker { config: &'static Config, @@ -51,10 +59,69 @@ impl Worker { } async fn single_server_mode(&self, server: Server) { - // TODO Implement + loop { + while self.perform_run(&server).await {} + tokio::time::sleep(self.config.worker_ping_delay).await; + } } async fn many_server_mode(&self, servers: Vec) { - // TODO Implement + loop { + for server in &servers { + let batch_start = OffsetDateTime::now_utc(); + let batch_end = batch_start + self.config.worker_batch_duration; + while OffsetDateTime::now_utc() <= batch_end { + if !self.perform_run(server).await { + break; + } + } + } + tokio::time::sleep(self.config.worker_ping_delay).await; + } + } + + /// Ask a server for a run, do the run, send results to the server. + /// + /// Returns whether a run was performed. + async fn perform_run(&self, server: &Server) -> bool { + // Request run + let guard = server.status_lock.lock().await; + let Some(run) = self.request_run(server).await else { return false; }; + let run = RunInProgress::new(server.name.clone(), run); + *server.current_run.lock().unwrap() = Some(run.clone()); + drop(guard); + + // Perform run + let Some(run) = run.perform().await else { return false; }; + + // Submit run + let guard = server.status_lock.lock().await; + *server.current_run.lock().unwrap() = None; + while !self.submit_run(server, run.clone()).await { + tokio::time::sleep(self.config.worker_ping_delay).await; + } + drop(guard); + + true + } + + async fn request_run(&self, server: &Server) -> Option { + match server.post_status(true, None).await { + Ok(response) => response.run, + Err(e) => { + warn!("Error requesting run:\n{e:?}"); + None + } + } + } + + async fn submit_run(&self, server: &Server, run: FinishedRun) -> bool { + match server.post_status(false, Some(run)).await { + Ok(_) => true, + Err(e) => { + warn!("Error submitting run:\n{e:?}"); + false + } + } } } diff --git a/src/worker/run.rs b/src/worker/run.rs index 85d8aaf..7bdd66d 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -1,12 +1,77 @@ -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; -use tokio::sync::mpsc; +use tokio::sync::Notify; +use tracing::error; -use crate::shared::{Run, Source}; +use crate::{ + shared::{BenchMethod, FinishedRun, Measurement, Run, Source}, + somehow, +}; +struct Finished { + exit_code: i32, + measurements: HashMap, +} + +// TODO Make fields private +#[derive(Clone)] pub struct RunInProgress { pub server_name: String, pub run: Run, pub output: Arc>>, - pub abort: mpsc::UnboundedSender<()>, + pub abort: Arc, +} + +impl RunInProgress { + pub fn new(server_name: String, run: Run) -> Self { + Self { + server_name, + run, + output: Arc::new(Mutex::new(vec![])), + abort: Arc::new(Notify::new()), + } + } + + pub fn log_stdout(&self, line: String) { + self.output.lock().unwrap().push((Source::Stdout, line)); + } + + pub fn log_stderr(&self, line: String) { + self.output.lock().unwrap().push((Source::Stderr, line)); + } + + pub async fn perform(&self) -> Option { + // TODO Remove type annotations + // TODO Handle aborts + let result: somehow::Result<_> = match &self.run.bench_method { + BenchMethod::Internal => todo!(), + BenchMethod::Repo { hash } => todo!(), + }; + + let finished = match result { + Ok(outcome) => outcome, + Err(e) => { + error!("Error during run:\n{e:?}"); + self.log_stderr("Internal error:".to_string()); + self.log_stderr(format!("{e:?}")); + Some(Finished { + exit_code: -1, + measurements: HashMap::new(), + }) + } + }?; + + let mut output = vec![]; + std::mem::swap(&mut output, &mut *self.output.lock().unwrap()); + + Some(FinishedRun { + run: self.run.clone(), + exit_code: finished.exit_code, + output, + measurements: finished.measurements, + }) + } } diff --git a/src/worker/server.rs b/src/worker/server.rs index dfc0169..4295f6a 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -45,17 +45,35 @@ impl Server { // TODO Limit status requests to one in flight at a time (per server) pub async fn post_status( &self, - status: WorkerStatus, - request_work: bool, - submit_work: Option, + request_run: bool, + submit_run: Option, ) -> somehow::Result { let url = format!("{}api/worker/status", self.server_config.url); + + let status = match &*self.current_run.lock().unwrap() { + Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun { + run: run.run.clone(), + last_output: run + .output + .lock() + .unwrap() + .iter() + .rev() + .take(SCROLLBACK) + .rev() + .cloned() + .collect(), + }), + Some(_) => WorkerStatus::Busy, + None => WorkerStatus::Idle, + }; + let request = WorkerRequest { info: None, secret: self.secret.clone(), status, - request_run: request_work, - submit_run: submit_work, + request_run, + submit_run, }; let response = self @@ -107,25 +125,7 @@ impl Server { debug!("Pinging server"); let guard = self.status_lock.lock().await; - let status = match &*self.current_run.lock().unwrap() { - Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun { - run: run.run.clone(), - last_output: run - .output - .lock() - .unwrap() - .iter() - .rev() - .take(SCROLLBACK) - .rev() - .cloned() - .collect(), - }), - Some(_) => WorkerStatus::Busy, - None => WorkerStatus::Idle, - }; - - let response = self.post_status(status, false, None).await?; + let response = self.post_status(false, None).await?; // TODO Signal that run should be aborted