diff --git a/Cargo.lock b/Cargo.lock index 6e99fe8..16d927a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -2377,10 +2386,33 @@ dependencies = [ ] [[package]] -name = "regex-automata" -version = "0.3.4" +name = "regex" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2" [[package]] name = "reqwest" @@ -3109,6 +3141,7 @@ dependencies = [ "mime_guess", "open", "rand", + "regex", "reqwest", "rust-embed", "serde", diff --git a/Cargo.toml b/Cargo.toml index ee67e19..1f309f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ humantime-serde = "1.1.1" mime_guess = "2.0.4" open = "5.0.0" rand = "0.8.5" +regex = "1.9.3" rust-embed = { version = "6.8.1", features = ["interpolate-folder-path"] } serde = { version = "1.0.181", features = ["derive"] } serde_repr = "0.1.16" @@ -29,6 +30,7 @@ tokio-stream = "0.1.14" toml = "0.7.6" tracing = "0.1.37" tracing-subscriber = "0.3.17" +walkdir = "2.3.3" [dependencies.gix] version = "0.51.0" diff --git a/src/worker/run.rs b/src/worker/run.rs index ec4375f..4076b75 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use tracing::{debug_span, error, Instrument}; use crate::{ - id, + config::WorkerServerConfig, shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun}, }; @@ -34,6 +34,16 @@ pub enum RunStatus { Aborted, } +impl RunStatus { + pub fn finished(exit_code: i32, measurements: HashMap) -> Self { + Self::Finished { + end: OffsetDateTime::now_utc(), + exit_code, + measurements, + } + } +} + #[derive(Clone)] pub struct Run { id: String, @@ -54,6 +64,14 @@ impl Run { } } + pub fn log_stdout(&mut self, line: String) { + self.output.push((Source::Stdout, line)); + } + + pub fn log_stderr(&mut self, line: String) { + self.output.push((Source::Stderr, line)); + } + pub fn into_full_status(self) -> FullRunStatus { match self.status { RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun { @@ -89,19 +107,31 @@ impl Run { } pub async fn run( + server_config: &'static WorkerServerConfig, + poke_tx: mpsc::UnboundedSender<()>, run: Arc>, + bench: BenchMethod, abort_rx: mpsc::UnboundedReceiver<()>, - bench_method: BenchMethod, ) { async { - let result = match bench_method { - BenchMethod::Internal => internal::run(run, abort_rx).await, - BenchMethod::Repo { hash } => repo::run(run, hash, abort_rx).await, + let result = match bench { + BenchMethod::Internal => internal::run(server_config, run.clone(), abort_rx).await, + BenchMethod::Repo { hash } => repo::run(run.clone(), hash, abort_rx).await, }; match result { - Ok(()) => {} - Err(e) => error!("Error during run:\n{e:?}"), + Ok(status) => { + assert!(!matches!(status, RunStatus::Unfinished)); + run.lock().unwrap().status = status; + } + Err(e) => { + error!("Error during run:\n{e:?}"); + let mut guard = run.lock().unwrap(); + guard.log_stderr("Internal error:".to_string()); + guard.log_stderr(format!("{e:?}")); + guard.status = RunStatus::finished(-1, HashMap::new()); + } } + let _ = poke_tx.send(()); } .instrument(debug_span!("run")) .await; diff --git a/src/worker/run/internal.rs b/src/worker/run/internal.rs index 4a774a5..dd90d51 100644 --- a/src/worker/run/internal.rs +++ b/src/worker/run/internal.rs @@ -1,14 +1,160 @@ -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + fs::File, + io::{BufRead, BufReader}, + path::Path, + sync::{Arc, Mutex}, +}; -use tokio::sync::mpsc; +use regex::RegexBuilder; +use tokio::{select, sync::mpsc}; +use tracing::debug; +use walkdir::WalkDir; -use crate::somehow; +use crate::{ + config::WorkerServerConfig, + shared::{Direction, Measurement}, + somehow, + worker::{run::RunStatus, tree::UnpackedTree}, +}; use super::Run; -pub async fn run( - run: Arc>, - abort_rx: mpsc::UnboundedReceiver<()>, -) -> somehow::Result<()> { - todo!() +#[derive(Default)] +struct Counts { + files_by_ext: HashMap, + lines_by_ext: HashMap, + todos_by_ext: HashMap, +} + +fn count(path: &Path) -> somehow::Result { + let todo_regex = RegexBuilder::new(r"[^a-z]todo[^a-z]") + .case_insensitive(true) + .build() + .unwrap(); + + let mut counts = Counts::default(); + for entry in WalkDir::new(path) { + let entry = entry?; + if !entry.file_type().is_file() { + continue; + } + + let extension = entry + .path() + .extension() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + + let mut lines = 0; + let mut todos = 0; + for line in BufReader::new(File::open(entry.path())?).lines() { + let line = line?; + lines += 1; + if todo_regex.is_match(&line) { + todos += 1; + } + } + + *counts.files_by_ext.entry(extension.clone()).or_default() += 1; + *counts.lines_by_ext.entry(extension.clone()).or_default() += lines; + *counts.todos_by_ext.entry(extension.clone()).or_default() += todos; + } + + Ok(counts) +} + +fn measurements(counts: Counts) -> HashMap { + let mut measurements = HashMap::new(); + + // Files + measurements.insert( + "files".to_string(), + Measurement { + value: counts.files_by_ext.values().sum::() as f64, + stddev: None, + unit: None, + direction: Some(Direction::Neutral), + }, + ); + for (extension, count) in counts.files_by_ext { + measurements.insert( + format!("files.{extension}"), + Measurement { + value: count as f64, + stddev: None, + unit: None, + direction: Some(Direction::Neutral), + }, + ); + } + + // Lines + measurements.insert( + "lines".to_string(), + Measurement { + value: counts.lines_by_ext.values().sum::() as f64, + stddev: None, + unit: None, + direction: Some(Direction::Neutral), + }, + ); + for (extension, count) in counts.lines_by_ext { + measurements.insert( + format!("lines.{extension}"), + Measurement { + value: count as f64, + stddev: None, + unit: None, + direction: Some(Direction::Neutral), + }, + ); + } + + // Todos + measurements.insert( + "todos".to_string(), + Measurement { + value: counts.todos_by_ext.values().sum::() as f64, + stddev: None, + unit: None, + direction: Some(Direction::LessIsBetter), + }, + ); + for (extension, count) in counts.todos_by_ext { + measurements.insert( + format!("todos.{extension}"), + Measurement { + value: count as f64, + stddev: None, + unit: None, + direction: Some(Direction::LessIsBetter), + }, + ); + } + + measurements +} + +pub async fn run( + server_config: &'static WorkerServerConfig, + run: Arc>, + mut abort_rx: mpsc::UnboundedReceiver<()>, +) -> somehow::Result { + let hash = run.lock().unwrap().hash.clone(); + let url = format!("{}api/worker/repo/{}", server_config.url, hash); + let tree = select! { + r = UnpackedTree::download(&url, hash) => Some(r?), + _ = abort_rx.recv() => None, + }; + let Some(tree) = tree else { + debug!("Run aborted while downloading commit"); + return Ok(RunStatus::Aborted); + }; + + let path = tree.dir.path().to_path_buf(); + let counts = tokio::task::spawn_blocking(move || count(&path)).await??; + + Ok(RunStatus::finished(0, measurements(counts))) } diff --git a/src/worker/run/repo.rs b/src/worker/run/repo.rs index 605b022..da43877 100644 --- a/src/worker/run/repo.rs +++ b/src/worker/run/repo.rs @@ -4,12 +4,12 @@ use tokio::sync::mpsc; use crate::somehow; -use super::Run; +use super::{Run, RunStatus}; pub async fn run( run: Arc>, hash: String, abort_rx: mpsc::UnboundedReceiver<()>, -) -> somehow::Result<()> { +) -> somehow::Result { todo!() } diff --git a/src/worker/server.rs b/src/worker/server.rs index 764f9a1..a52b22b 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -59,7 +59,7 @@ impl Server { let name = self.name.clone(); async { loop { - match self.ping().await { + match self.ping(&poke_tx).await { Ok(()) => {} Err(e) => warn!("Error talking to server:\n{e:?}"), } @@ -82,12 +82,12 @@ impl Server { while poke_rx.try_recv().is_ok() {} } - async fn ping(&mut self) -> somehow::Result<()> { + async fn ping(&mut self, poke_tx: &mpsc::UnboundedSender<()>) -> somehow::Result<()> { debug!("Pinging server"); let info = self.coordinator.lock().unwrap().active(&self.name); if info.active { - self.ping_active(info).await?; + self.ping_active(info, poke_tx).await?; } else { self.ping_inactive(info).await?; } @@ -106,7 +106,11 @@ impl Server { Ok(()) } - async fn ping_active(&mut self, info: ActiveInfo) -> somehow::Result<()> { + async fn ping_active( + &mut self, + info: ActiveInfo, + poke_tx: &mpsc::UnboundedSender<()>, + ) -> somehow::Result<()> { let run = self .run .as_ref() @@ -166,7 +170,13 @@ impl Server { self.run = Some((run.clone(), abort_tx)); self.coordinator.lock().unwrap().look_busy(&self.name); - tokio::spawn(run::run(run, abort_rx, work.bench)); + tokio::spawn(run::run( + self.server_config, + poke_tx.clone(), + run, + work.bench, + abort_rx, + )); } // Finally, advance to the next server if it makes sense to do so diff --git a/src/worker/tree.rs b/src/worker/tree.rs index e8daa9b..c23f706 100644 --- a/src/worker/tree.rs +++ b/src/worker/tree.rs @@ -65,7 +65,7 @@ impl UnpackedTree { Ok(()) } - async fn download(url: &str, hash: String) -> somehow::Result { + pub async fn download(url: &str, hash: String) -> somehow::Result { let dir = TempDir::new()?; debug!( "Downloading and unpacking {url} to {}",