diff --git a/src/worker.rs b/src/worker.rs index 58ca720..c957f95 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,10 +1,13 @@ +mod run; mod server; mod tree; -use reqwest::Client; -use tracing::error; +use std::sync::{Arc, Mutex}; -use crate::{config::Config, worker::server::Server}; +use reqwest::Client; +use tracing::{error, info}; + +use crate::{config::Config, id, worker::server::Server}; pub struct Worker { config: &'static Config, @@ -17,17 +20,24 @@ impl Worker { pub async fn run(&self) { let client = Client::new(); + let current_run = Arc::new(Mutex::new(None)); let mut servers = self .config .worker_servers .iter() - .map(|(name, server_config)| { - Server::new(name.clone(), self.config, server_config, client.clone()) + .map(|(name, server_config)| Server { + name: name.clone(), + config: self.config, + server_config, + secret: id::random_worker_secret(), + client: client.clone(), + current_run: current_run.clone(), }) .collect::>(); for server in &servers { + info!("Connecting to server {}", server.name); tokio::spawn(server.clone().ping_periodically()); } diff --git a/src/worker/run.rs b/src/worker/run.rs index 4076b75..85d8aaf 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -1,138 +1,12 @@ -mod internal; -mod repo; +use std::sync::{Arc, Mutex}; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -use time::OffsetDateTime; use tokio::sync::mpsc; -use tracing::{debug_span, error, Instrument}; -use crate::{ - config::WorkerServerConfig, - shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun}, -}; +use crate::shared::{Run, Source}; -const LIVE_SCROLLBACK: usize = 50; - -pub enum FullRunStatus { - Unfinished(UnfinishedRun), - Finished(FinishedRun), - Aborted, -} - -#[derive(Clone)] -pub enum RunStatus { - Unfinished, - Finished { - end: OffsetDateTime, - exit_code: i32, - measurements: HashMap, - }, - Aborted, -} - -impl RunStatus { - pub fn finished(exit_code: i32, measurements: HashMap) -> Self { - Self::Finished { - end: OffsetDateTime::now_utc(), - exit_code, - measurements, - } - } -} - -#[derive(Clone)] -pub struct Run { - id: String, - hash: String, - start: OffsetDateTime, - output: Vec<(Source, String)>, - status: RunStatus, -} - -impl Run { - pub fn new(id: String, hash: String) -> Self { - Self { - id, - hash, - start: OffsetDateTime::now_utc(), - output: vec![], - status: RunStatus::Unfinished, - } - } - - pub fn log_stdout(&mut self, line: String) { - self.output.push((Source::Stdout, line)); - } - - pub fn log_stderr(&mut self, line: String) { - self.output.push((Source::Stderr, line)); - } - - pub fn into_full_status(self) -> FullRunStatus { - match self.status { - RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun { - id: self.id, - hash: self.hash, - start: self.start, - last_output: self - .output - .into_iter() - .rev() - .take(LIVE_SCROLLBACK) - .rev() - .collect(), - }), - - RunStatus::Finished { - end, - exit_code, - measurements, - } => FullRunStatus::Finished(FinishedRun { - id: self.id, - hash: self.hash, - start: self.start, - end, - exit_code, - measurements, - output: self.output, - }), - - RunStatus::Aborted => FullRunStatus::Aborted, - } - } -} - -pub async fn run( - server_config: &'static WorkerServerConfig, - poke_tx: mpsc::UnboundedSender<()>, - run: Arc>, - bench: BenchMethod, - abort_rx: mpsc::UnboundedReceiver<()>, -) { - async { - let result = match bench { - BenchMethod::Internal => internal::run(server_config, run.clone(), abort_rx).await, - BenchMethod::Repo { hash } => repo::run(run.clone(), hash, abort_rx).await, - }; - match result { - Ok(status) => { - assert!(!matches!(status, RunStatus::Unfinished)); - run.lock().unwrap().status = status; - } - Err(e) => { - error!("Error during run:\n{e:?}"); - let mut guard = run.lock().unwrap(); - guard.log_stderr("Internal error:".to_string()); - guard.log_stderr(format!("{e:?}")); - guard.status = RunStatus::finished(-1, HashMap::new()); - } - } - let _ = poke_tx.send(()); - } - .instrument(debug_span!("run")) - .await; +pub struct RunInProgress { + pub server_name: String, + pub run: Run, + pub output: Arc>>, + pub abort: mpsc::UnboundedSender<()>, } diff --git a/src/worker/server.rs b/src/worker/server.rs index 0c36b13..4208dc9 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -1,40 +1,31 @@ +use std::sync::{Arc, Mutex}; + use reqwest::Client; use tempfile::TempDir; use tracing::{debug, warn}; use crate::{ config::{Config, WorkerServerConfig}, - id, - shared::{FinishedRun, ServerResponse, WorkerRequest, WorkerStatus}, + shared::{FinishedRun, ServerResponse, UnfinishedRun, WorkerRequest, WorkerStatus}, somehow, worker::tree, }; +use super::run::RunInProgress; + +const SCROLLBACK: usize = 50; + #[derive(Clone)] pub struct Server { - name: String, - config: &'static Config, - server_config: &'static WorkerServerConfig, - client: Client, - secret: String, + pub name: String, + pub config: &'static Config, + pub server_config: &'static WorkerServerConfig, + pub secret: String, + pub client: Client, + pub current_run: Arc>>, } impl Server { - pub fn new( - name: String, - config: &'static Config, - server_config: &'static WorkerServerConfig, - client: Client, - ) -> Self { - Self { - name, - config, - server_config, - client, - secret: id::random_worker_secret(), - } - } - // TODO Limit status requests to one in flight at a time (per server) pub async fn post_status( &self, @@ -99,8 +90,23 @@ impl Server { async fn ping(&self) -> somehow::Result<()> { debug!("Pinging server"); - // TODO Use actual status - let status = WorkerStatus::Idle; + let status = match &*self.current_run.lock().unwrap() { + Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun { + run: run.run.clone(), + last_output: run + .output + .lock() + .unwrap() + .iter() + .rev() + .take(SCROLLBACK) + .rev() + .cloned() + .collect(), + }), + Some(_) => WorkerStatus::Busy, + None => WorkerStatus::Idle, + }; let response = self.post_status(status, false, None).await?;