Perform runs with internal bench logic

This commit is contained in:
Joscha 2023-08-12 21:21:32 +02:00
parent 0196709a64
commit 0bad08eca9
4 changed files with 49 additions and 34 deletions

View file

@ -206,6 +206,7 @@ impl ConfigFile {
}
}
// TODO Url functions
#[derive(Clone)]
pub struct WorkerServerConfig {
/// Always ends with a `/`.

View file

@ -87,12 +87,12 @@ impl Worker {
// Request run
let guard = server.status_lock.lock().await;
let Some(run) = self.request_run(server).await else { return false; };
let run = RunInProgress::new(server.name.clone(), run);
let run = RunInProgress::new(server.name.clone(), server.server_config, run);
*server.current_run.lock().unwrap() = Some(run.clone());
drop(guard);
// Perform run
let Some(run) = run.perform().await else { return false; };
let Some(run) = run.perform(server).await else { return false; };
// Submit run
let guard = server.status_lock.lock().await;

View file

@ -1,3 +1,5 @@
mod internal;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
@ -7,10 +9,13 @@ use tokio::sync::Notify;
use tracing::error;
use crate::{
config::WorkerServerConfig,
shared::{BenchMethod, FinishedRun, Measurement, Run, Source, UnfinishedRun},
somehow,
};
use super::server::Server;
struct Finished {
exit_code: i32,
measurements: HashMap<String, Measurement>,
@ -21,15 +26,17 @@ const SCROLLBACK: usize = 50;
#[derive(Clone)]
pub struct RunInProgress {
server_name: String,
server_config: &'static WorkerServerConfig,
run: Run,
output: Arc<Mutex<Vec<(Source, String)>>>,
abort: Arc<Notify>,
}
impl RunInProgress {
pub fn new(server_name: String, run: Run) -> Self {
pub fn new(server_name: String, server_config: &'static WorkerServerConfig, run: Run) -> Self {
Self {
server_name,
server_config,
run,
output: Arc::new(Mutex::new(vec![])),
abort: Arc::new(Notify::new()),
@ -63,13 +70,14 @@ impl RunInProgress {
self.output.lock().unwrap().push((Source::Stderr, line));
}
pub async fn perform(&self) -> Option<FinishedRun> {
pub async fn perform(&self, server: &Server) -> Option<FinishedRun> {
// TODO Remove type annotations
// TODO Handle aborts
let result: somehow::Result<_> = match &self.run.bench_method {
BenchMethod::Internal => todo!(),
BenchMethod::Internal => self.perform_internal(server),
BenchMethod::Repo { hash } => todo!(),
};
}
.await;
let finished = match result {
Ok(outcome) => outcome,

View file

@ -2,23 +2,19 @@ use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
path::Path,
sync::{Arc, Mutex},
path::{Path, PathBuf},
};
use regex::RegexBuilder;
use tokio::{select, sync::mpsc};
use tracing::debug;
use walkdir::WalkDir;
use crate::{
config::WorkerServerConfig,
shared::{Direction, Measurement},
somehow,
worker::{run::RunStatus, tree::UnpackedTree},
worker::server::Server,
};
use super::Run;
use super::{Finished, RunInProgress};
#[derive(Default)]
struct Counts {
@ -27,7 +23,7 @@ struct Counts {
todos_by_ext: HashMap<String, usize>,
}
fn count(path: &Path) -> somehow::Result<Counts> {
fn count(run: &RunInProgress, path: &Path) -> somehow::Result<Counts> {
let todo_regex = RegexBuilder::new(r"[^a-z]todo[^a-z]")
.case_insensitive(true)
.build()
@ -60,6 +56,22 @@ fn count(path: &Path) -> somehow::Result<Counts> {
*counts.files_by_ext.entry(extension.clone()).or_default() += 1;
*counts.lines_by_ext.entry(extension.clone()).or_default() += lines;
*counts.todos_by_ext.entry(extension.clone()).or_default() += todos;
let relative_path = entry
.path()
.components()
.collect::<Vec<_>>()
.into_iter()
.rev()
.take(entry.depth())
.rev()
.collect::<PathBuf>();
run.log_stdout(format!(
"{} has {lines} line{}, {todos} todo{}",
relative_path.display(),
if lines == 1 { "" } else { "s" },
if todos == 1 { "" } else { "s" },
));
}
Ok(counts)
@ -137,24 +149,18 @@ fn measurements(counts: Counts) -> HashMap<String, Measurement> {
measurements
}
pub async fn run(
server_config: &'static WorkerServerConfig,
run: Arc<Mutex<Run>>,
mut abort_rx: mpsc::UnboundedReceiver<()>,
) -> somehow::Result<RunStatus> {
let hash = run.lock().unwrap().hash.clone();
let url = format!("{}api/worker/repo/{}", server_config.url, hash);
let tree = select! {
r = UnpackedTree::download(&url, hash) => Some(r?),
_ = abort_rx.recv() => None,
};
let Some(tree) = tree else {
debug!("Run aborted while downloading commit");
return Ok(RunStatus::Aborted);
};
let path = tree.dir.path().to_path_buf();
let counts = tokio::task::spawn_blocking(move || count(&path)).await??;
Ok(RunStatus::finished(0, measurements(counts)))
impl RunInProgress {
pub(super) async fn perform_internal(
&self,
server: &Server,
) -> somehow::Result<Option<Finished>> {
let run = self.clone();
let dir = server.download_repo(&self.run.hash).await?;
let path = dir.path().to_path_buf();
let counts = tokio::task::spawn_blocking(move || count(&run, &path)).await??;
Ok(Some(Finished {
exit_code: 0,
measurements: measurements(counts),
}))
}
}