Implement internal runner

This commit is contained in:
Joscha 2023-08-12 14:16:22 +02:00
parent d15d6588f7
commit 416e3e6aa1
7 changed files with 247 additions and 26 deletions

39
Cargo.lock generated
View file

@ -29,6 +29,15 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "aho-corasick"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "allocator-api2" name = "allocator-api2"
version = "0.2.16" version = "0.2.16"
@ -2377,10 +2386,33 @@ dependencies = [
] ]
[[package]] [[package]]
name = "regex-automata" name = "regex"
version = "0.3.4" version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
@ -3109,6 +3141,7 @@ dependencies = [
"mime_guess", "mime_guess",
"open", "open",
"rand", "rand",
"regex",
"reqwest", "reqwest",
"rust-embed", "rust-embed",
"serde", "serde",

View file

@ -18,6 +18,7 @@ humantime-serde = "1.1.1"
mime_guess = "2.0.4" mime_guess = "2.0.4"
open = "5.0.0" open = "5.0.0"
rand = "0.8.5" rand = "0.8.5"
regex = "1.9.3"
rust-embed = { version = "6.8.1", features = ["interpolate-folder-path"] } rust-embed = { version = "6.8.1", features = ["interpolate-folder-path"] }
serde = { version = "1.0.181", features = ["derive"] } serde = { version = "1.0.181", features = ["derive"] }
serde_repr = "0.1.16" serde_repr = "0.1.16"
@ -29,6 +30,7 @@ tokio-stream = "0.1.14"
toml = "0.7.6" toml = "0.7.6"
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
walkdir = "2.3.3"
[dependencies.gix] [dependencies.gix]
version = "0.51.0" version = "0.51.0"

View file

@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use tracing::{debug_span, error, Instrument}; use tracing::{debug_span, error, Instrument};
use crate::{ use crate::{
id, config::WorkerServerConfig,
shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun}, shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun},
}; };
@ -34,6 +34,16 @@ pub enum RunStatus {
Aborted, Aborted,
} }
impl RunStatus {
pub fn finished(exit_code: i32, measurements: HashMap<String, Measurement>) -> Self {
Self::Finished {
end: OffsetDateTime::now_utc(),
exit_code,
measurements,
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Run { pub struct Run {
id: String, id: String,
@ -54,6 +64,14 @@ impl Run {
} }
} }
pub fn log_stdout(&mut self, line: String) {
self.output.push((Source::Stdout, line));
}
pub fn log_stderr(&mut self, line: String) {
self.output.push((Source::Stderr, line));
}
pub fn into_full_status(self) -> FullRunStatus { pub fn into_full_status(self) -> FullRunStatus {
match self.status { match self.status {
RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun { RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun {
@ -89,19 +107,31 @@ impl Run {
} }
pub async fn run( pub async fn run(
server_config: &'static WorkerServerConfig,
poke_tx: mpsc::UnboundedSender<()>,
run: Arc<Mutex<Run>>, run: Arc<Mutex<Run>>,
bench: BenchMethod,
abort_rx: mpsc::UnboundedReceiver<()>, abort_rx: mpsc::UnboundedReceiver<()>,
bench_method: BenchMethod,
) { ) {
async { async {
let result = match bench_method { let result = match bench {
BenchMethod::Internal => internal::run(run, abort_rx).await, BenchMethod::Internal => internal::run(server_config, run.clone(), abort_rx).await,
BenchMethod::Repo { hash } => repo::run(run, hash, abort_rx).await, BenchMethod::Repo { hash } => repo::run(run.clone(), hash, abort_rx).await,
}; };
match result { match result {
Ok(()) => {} Ok(status) => {
Err(e) => error!("Error during run:\n{e:?}"), assert!(!matches!(status, RunStatus::Unfinished));
run.lock().unwrap().status = status;
} }
Err(e) => {
error!("Error during run:\n{e:?}");
let mut guard = run.lock().unwrap();
guard.log_stderr("Internal error:".to_string());
guard.log_stderr(format!("{e:?}"));
guard.status = RunStatus::finished(-1, HashMap::new());
}
}
let _ = poke_tx.send(());
} }
.instrument(debug_span!("run")) .instrument(debug_span!("run"))
.await; .await;

View file

@ -1,14 +1,160 @@
use std::sync::{Arc, Mutex}; use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
path::Path,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc; use regex::RegexBuilder;
use tokio::{select, sync::mpsc};
use tracing::debug;
use walkdir::WalkDir;
use crate::somehow; use crate::{
config::WorkerServerConfig,
shared::{Direction, Measurement},
somehow,
worker::{run::RunStatus, tree::UnpackedTree},
};
use super::Run; use super::Run;
pub async fn run( #[derive(Default)]
run: Arc<Mutex<Run>>, struct Counts {
abort_rx: mpsc::UnboundedReceiver<()>, files_by_ext: HashMap<String, usize>,
) -> somehow::Result<()> { lines_by_ext: HashMap<String, usize>,
todo!() todos_by_ext: HashMap<String, usize>,
}
fn count(path: &Path) -> somehow::Result<Counts> {
let todo_regex = RegexBuilder::new(r"[^a-z]todo[^a-z]")
.case_insensitive(true)
.build()
.unwrap();
let mut counts = Counts::default();
for entry in WalkDir::new(path) {
let entry = entry?;
if !entry.file_type().is_file() {
continue;
}
let extension = entry
.path()
.extension()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let mut lines = 0;
let mut todos = 0;
for line in BufReader::new(File::open(entry.path())?).lines() {
let line = line?;
lines += 1;
if todo_regex.is_match(&line) {
todos += 1;
}
}
*counts.files_by_ext.entry(extension.clone()).or_default() += 1;
*counts.lines_by_ext.entry(extension.clone()).or_default() += lines;
*counts.todos_by_ext.entry(extension.clone()).or_default() += todos;
}
Ok(counts)
}
fn measurements(counts: Counts) -> HashMap<String, Measurement> {
let mut measurements = HashMap::new();
// Files
measurements.insert(
"files".to_string(),
Measurement {
value: counts.files_by_ext.values().sum::<usize>() as f64,
stddev: None,
unit: None,
direction: Some(Direction::Neutral),
},
);
for (extension, count) in counts.files_by_ext {
measurements.insert(
format!("files.{extension}"),
Measurement {
value: count as f64,
stddev: None,
unit: None,
direction: Some(Direction::Neutral),
},
);
}
// Lines
measurements.insert(
"lines".to_string(),
Measurement {
value: counts.lines_by_ext.values().sum::<usize>() as f64,
stddev: None,
unit: None,
direction: Some(Direction::Neutral),
},
);
for (extension, count) in counts.lines_by_ext {
measurements.insert(
format!("lines.{extension}"),
Measurement {
value: count as f64,
stddev: None,
unit: None,
direction: Some(Direction::Neutral),
},
);
}
// Todos
measurements.insert(
"todos".to_string(),
Measurement {
value: counts.todos_by_ext.values().sum::<usize>() as f64,
stddev: None,
unit: None,
direction: Some(Direction::LessIsBetter),
},
);
for (extension, count) in counts.todos_by_ext {
measurements.insert(
format!("todos.{extension}"),
Measurement {
value: count as f64,
stddev: None,
unit: None,
direction: Some(Direction::LessIsBetter),
},
);
}
measurements
}
pub async fn run(
server_config: &'static WorkerServerConfig,
run: Arc<Mutex<Run>>,
mut abort_rx: mpsc::UnboundedReceiver<()>,
) -> somehow::Result<RunStatus> {
let hash = run.lock().unwrap().hash.clone();
let url = format!("{}api/worker/repo/{}", server_config.url, hash);
let tree = select! {
r = UnpackedTree::download(&url, hash) => Some(r?),
_ = abort_rx.recv() => None,
};
let Some(tree) = tree else {
debug!("Run aborted while downloading commit");
return Ok(RunStatus::Aborted);
};
let path = tree.dir.path().to_path_buf();
let counts = tokio::task::spawn_blocking(move || count(&path)).await??;
Ok(RunStatus::finished(0, measurements(counts)))
} }

View file

@ -4,12 +4,12 @@ use tokio::sync::mpsc;
use crate::somehow; use crate::somehow;
use super::Run; use super::{Run, RunStatus};
pub async fn run( pub async fn run(
run: Arc<Mutex<Run>>, run: Arc<Mutex<Run>>,
hash: String, hash: String,
abort_rx: mpsc::UnboundedReceiver<()>, abort_rx: mpsc::UnboundedReceiver<()>,
) -> somehow::Result<()> { ) -> somehow::Result<RunStatus> {
todo!() todo!()
} }

View file

@ -59,7 +59,7 @@ impl Server {
let name = self.name.clone(); let name = self.name.clone();
async { async {
loop { loop {
match self.ping().await { match self.ping(&poke_tx).await {
Ok(()) => {} Ok(()) => {}
Err(e) => warn!("Error talking to server:\n{e:?}"), Err(e) => warn!("Error talking to server:\n{e:?}"),
} }
@ -82,12 +82,12 @@ impl Server {
while poke_rx.try_recv().is_ok() {} while poke_rx.try_recv().is_ok() {}
} }
async fn ping(&mut self) -> somehow::Result<()> { async fn ping(&mut self, poke_tx: &mpsc::UnboundedSender<()>) -> somehow::Result<()> {
debug!("Pinging server"); debug!("Pinging server");
let info = self.coordinator.lock().unwrap().active(&self.name); let info = self.coordinator.lock().unwrap().active(&self.name);
if info.active { if info.active {
self.ping_active(info).await?; self.ping_active(info, poke_tx).await?;
} else { } else {
self.ping_inactive(info).await?; self.ping_inactive(info).await?;
} }
@ -106,7 +106,11 @@ impl Server {
Ok(()) Ok(())
} }
async fn ping_active(&mut self, info: ActiveInfo) -> somehow::Result<()> { async fn ping_active(
&mut self,
info: ActiveInfo,
poke_tx: &mpsc::UnboundedSender<()>,
) -> somehow::Result<()> {
let run = self let run = self
.run .run
.as_ref() .as_ref()
@ -166,7 +170,13 @@ impl Server {
self.run = Some((run.clone(), abort_tx)); self.run = Some((run.clone(), abort_tx));
self.coordinator.lock().unwrap().look_busy(&self.name); self.coordinator.lock().unwrap().look_busy(&self.name);
tokio::spawn(run::run(run, abort_rx, work.bench)); tokio::spawn(run::run(
self.server_config,
poke_tx.clone(),
run,
work.bench,
abort_rx,
));
} }
// Finally, advance to the next server if it makes sense to do so // Finally, advance to the next server if it makes sense to do so

View file

@ -65,7 +65,7 @@ impl UnpackedTree {
Ok(()) Ok(())
} }
async fn download(url: &str, hash: String) -> somehow::Result<Self> { pub async fn download(url: &str, hash: String) -> somehow::Result<Self> {
let dir = TempDir::new()?; let dir = TempDir::new()?;
debug!( debug!(
"Downloading and unpacking {url} to {}", "Downloading and unpacking {url} to {}",