diff --git a/src/main.rs b/src/main.rs index ead43cd..b2348f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -94,26 +94,25 @@ async fn open_in_browser(config: &Config) { } async fn launch_local_workers(config: &'static Config, amount: u8) { - let server_name = "localhost"; - let server_config = Box::leak(Box::new(WorkerServerConfig { - url: format!("http://{}{}", config.web_address, config.web_base), - token: config.web_worker_token.clone(), - })); - // Wait a bit to ensure the server is ready to serve requests. tokio::time::sleep(Duration::from_millis(100)).await; for i in 0..amount { - let mut worker_config = config.clone(); - worker_config.worker_name = format!("{}-{i}", config.worker_name); - let worker_config = Box::leak(Box::new(worker_config)); - - info!("Launching local worker {}", worker_config.worker_name); - worker::launch_standalone_server_task( - worker_config, - server_name.to_string(), - server_config, + let mut config = config.clone(); + config.worker_name = format!("{}-{i}", config.worker_name); + config.worker_servers.clear(); + config.worker_servers.insert( + "localhost".to_string(), + WorkerServerConfig { + url: format!("http://{}{}", config.web_address, config.web_base), + token: config.web_worker_token.clone(), + }, ); + let config = Box::leak(Box::new(config)); + + info!("Launching local worker {}", config.worker_name); + let worker = Worker::new(config); + tokio::spawn(async move { worker.run().await }); } } diff --git a/src/worker.rs b/src/worker.rs index 8724e89..58ca720 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,8 +1,10 @@ +mod server; mod tree; +use reqwest::Client; use tracing::error; -use crate::config::Config; +use crate::{config::Config, worker::server::Server}; pub struct Worker { config: &'static Config, @@ -14,11 +16,33 @@ impl Worker { } pub async fn run(&self) { - if self.config.worker_servers.is_empty() { - error!("No servers specified in config"); - return; + let client = Client::new(); + + let mut servers = self + .config + .worker_servers + .iter() + .map(|(name, server_config)| { + Server::new(name.clone(), self.config, server_config, client.clone()) + }) + .collect::>(); + + for server in &servers { + tokio::spawn(server.clone().ping_periodically()); } - todo!() + match servers.len() { + 0 => error!("No servers specified in config"), + 1 => self.single_server_mode(servers.pop().unwrap()).await, + _ => self.many_server_mode(servers).await, + } + } + + async fn single_server_mode(&self, server: Server) { + // TODO Implement + } + + async fn many_server_mode(&self, servers: Vec) { + // TODO Implement } } diff --git a/src/worker/server.rs b/src/worker/server.rs index a52b22b..1fcede5 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -1,32 +1,22 @@ -use std::sync::{Arc, Mutex}; - use reqwest::Client; -use tokio::sync::mpsc; -use tracing::{debug, info_span, warn, Instrument}; +use tempfile::TempDir; +use tracing::{debug, warn}; use crate::{ config::{Config, WorkerServerConfig}, id, shared::{FinishedRun, ServerResponse, WorkerRequest, WorkerStatus}, somehow, - worker::run::{self, FullRunStatus}, -}; - -use super::{ - coordinator::{ActiveInfo, Coordinator}, - run::Run, + worker::tree, }; +#[derive(Clone)] pub struct Server { name: String, config: &'static Config, server_config: &'static WorkerServerConfig, - coordinator: Arc>, client: Client, secret: String, - - // TODO Cache bench dir - run: Option<(Arc>, mpsc::UnboundedSender<()>)>, } impl Server { @@ -34,163 +24,19 @@ impl Server { name: String, config: &'static Config, server_config: &'static WorkerServerConfig, - coordinator: Arc>, + client: Client, ) -> Self { Self { name, config, server_config, - coordinator, - client: Client::new(), + client, secret: id::random_worker_secret(), - run: None, } } - pub async fn run(&mut self) { - // Register with coordinator - let (poke_tx, mut poke_rx) = mpsc::unbounded_channel(); - self.coordinator - .lock() - .unwrap() - .register(self.name.clone(), poke_tx.clone()); - - // Main loop - let name = self.name.clone(); - async { - loop { - match self.ping(&poke_tx).await { - Ok(()) => {} - Err(e) => warn!("Error talking to server:\n{e:?}"), - } - - self.wait_until_next_ping(&mut poke_rx).await; - } - } - .instrument(info_span!("worker", name)) - .await; - } - - async fn wait_until_next_ping(&self, poke_rx: &mut mpsc::UnboundedReceiver<()>) { - // Wait for poke or until the ping delay elapses. If we get poked while - // pinging the server, this will not wait and we'll immediately do - // another ping. - let _ = tokio::time::timeout(self.config.worker_ping_delay, poke_rx.recv()).await; - - // Empty queue in case we were poked more than once. This can happen for - // example if we get poked multiple times while pinging the server. - while poke_rx.try_recv().is_ok() {} - } - - async fn ping(&mut self, poke_tx: &mpsc::UnboundedSender<()>) -> somehow::Result<()> { - debug!("Pinging server"); - - let info = self.coordinator.lock().unwrap().active(&self.name); - if info.active { - self.ping_active(info, poke_tx).await?; - } else { - self.ping_inactive(info).await?; - } - - Ok(()) - } - - async fn ping_inactive(&self, info: ActiveInfo) -> somehow::Result<()> { - assert!(self.run.is_none()); - - let status = match info.busy { - true => WorkerStatus::Busy, - false => WorkerStatus::Idle, - }; - self.request(status, false, None).await?; - Ok(()) - } - - async fn ping_active( - &mut self, - info: ActiveInfo, - poke_tx: &mpsc::UnboundedSender<()>, - ) -> somehow::Result<()> { - let run = self - .run - .as_ref() - .map(|(r, _)| r.lock().unwrap().clone().into_full_status()) - .unwrap_or(FullRunStatus::Aborted); - - let unfinished = matches!(run, FullRunStatus::Unfinished(_)); - let aborted = matches!(run, FullRunStatus::Aborted); - let in_batch = info.in_batch(self.config.worker_batch_duration); - - let (status, submit_work) = match run { - FullRunStatus::Unfinished(run) => (WorkerStatus::Working(run), None), - FullRunStatus::Finished(run) => (WorkerStatus::Idle, Some(run)), - FullRunStatus::Aborted => (WorkerStatus::Idle, None), - }; - let request_work = in_batch && !unfinished; - let response = self.request(status, request_work, submit_work).await; - - if response.is_err() && aborted { - // We have nothing important going on, let's defer to the next - // server and hope this one will respond again soon. - self.coordinator - .lock() - .unwrap() - .move_to_next_server(&self.name); - - // Return explicitly to ensure we don't continue to the rest of the - // function in the false belief that we're active. Oh, and don't - // swallow the error. - response?; - return Ok(()); - } - - let response = response?; - - // Clean up self.run if we no longer need it - if !unfinished { - // We can get rid of finished runs since we just successfully sent - // the server the results. - self.run = None; - } - - // Abort run if server says so - if response.abort_work { - if let Some((_, abort_tx)) = &self.run { - let _ = abort_tx.send(()); - } - } - - // Start work (but only if we requested it) - if let Some(work) = response.work.filter(|_| request_work) { - assert!(!unfinished); - assert!(self.run.is_none()); - - let run = Arc::new(Mutex::new(Run::new(work.id, work.hash))); - let (abort_tx, abort_rx) = mpsc::unbounded_channel(); - - self.run = Some((run.clone(), abort_tx)); - self.coordinator.lock().unwrap().look_busy(&self.name); - tokio::spawn(run::run( - self.server_config, - poke_tx.clone(), - run, - work.bench, - abort_rx, - )); - } - - // Finally, advance to the next server if it makes sense to do so - if self.run.is_none() { - self.coordinator - .lock() - .unwrap() - .move_to_next_server(&self.name); - } - - Ok(()) - } - - async fn request( + // TODO Limit status requests to one in flight at a time (per server) + pub async fn post_status( &self, status: WorkerStatus, request_work: bool, @@ -217,4 +63,60 @@ impl Server { Ok(response) } + + pub async fn download_repo(&self, hash: &str) -> somehow::Result { + let url = format!( + "{}api/worker/repo/{hash}/tree.tar.gz", + self.server_config.url + ); + + let response = self + .client + .get(url) + .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) + .send() + .await?; + + tree::download(response).await + } + + pub async fn download_bench_repo(&self, hash: &str) -> somehow::Result { + let url = format!( + "{}api/worker/bench_repo/{hash}/tree.tar.gz", + self.server_config.url + ); + + let response = self + .client + .get(url) + .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) + .send() + .await?; + + tree::download(response).await + } + + async fn ping(&self) -> somehow::Result<()> { + debug!("Pinging server"); + + // TODO Use actual status + let status = WorkerStatus::Idle; + + let response = self.post_status(status, false, None).await?; + + // TODO Signal that run should be aborted + + Ok(()) + } + + pub async fn ping_periodically(self) { + loop { + match self.ping().await { + Ok(()) => {} + Err(e) => warn!("Error talking to server:\n{e:?}"), + } + + tokio::time::sleep(self.config.worker_ping_delay).await; + } + } }