Send actual runner status to server

This commit is contained in:
Joscha 2023-08-12 19:35:12 +02:00
parent c7a89867a7
commit e644f2be65
3 changed files with 52 additions and 162 deletions

View file

@ -1,10 +1,13 @@
mod run;
mod server; mod server;
mod tree; mod tree;
use reqwest::Client; use std::sync::{Arc, Mutex};
use tracing::error;
use crate::{config::Config, worker::server::Server}; use reqwest::Client;
use tracing::{error, info};
use crate::{config::Config, id, worker::server::Server};
pub struct Worker { pub struct Worker {
config: &'static Config, config: &'static Config,
@ -17,17 +20,24 @@ impl Worker {
pub async fn run(&self) { pub async fn run(&self) {
let client = Client::new(); let client = Client::new();
let current_run = Arc::new(Mutex::new(None));
let mut servers = self let mut servers = self
.config .config
.worker_servers .worker_servers
.iter() .iter()
.map(|(name, server_config)| { .map(|(name, server_config)| Server {
Server::new(name.clone(), self.config, server_config, client.clone()) name: name.clone(),
config: self.config,
server_config,
secret: id::random_worker_secret(),
client: client.clone(),
current_run: current_run.clone(),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for server in &servers { for server in &servers {
info!("Connecting to server {}", server.name);
tokio::spawn(server.clone().ping_periodically()); tokio::spawn(server.clone().ping_periodically());
} }

View file

@ -1,138 +1,12 @@
mod internal; use std::sync::{Arc, Mutex};
mod repo;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use time::OffsetDateTime;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{debug_span, error, Instrument};
use crate::{ use crate::shared::{Run, Source};
config::WorkerServerConfig,
shared::{BenchMethod, FinishedRun, Measurement, Source, UnfinishedRun},
};
const LIVE_SCROLLBACK: usize = 50; pub struct RunInProgress {
pub server_name: String,
pub enum FullRunStatus { pub run: Run,
Unfinished(UnfinishedRun), pub output: Arc<Mutex<Vec<(Source, String)>>>,
Finished(FinishedRun), pub abort: mpsc::UnboundedSender<()>,
Aborted,
}
#[derive(Clone)]
pub enum RunStatus {
Unfinished,
Finished {
end: OffsetDateTime,
exit_code: i32,
measurements: HashMap<String, Measurement>,
},
Aborted,
}
impl RunStatus {
pub fn finished(exit_code: i32, measurements: HashMap<String, Measurement>) -> Self {
Self::Finished {
end: OffsetDateTime::now_utc(),
exit_code,
measurements,
}
}
}
#[derive(Clone)]
pub struct Run {
id: String,
hash: String,
start: OffsetDateTime,
output: Vec<(Source, String)>,
status: RunStatus,
}
impl Run {
pub fn new(id: String, hash: String) -> Self {
Self {
id,
hash,
start: OffsetDateTime::now_utc(),
output: vec![],
status: RunStatus::Unfinished,
}
}
pub fn log_stdout(&mut self, line: String) {
self.output.push((Source::Stdout, line));
}
pub fn log_stderr(&mut self, line: String) {
self.output.push((Source::Stderr, line));
}
pub fn into_full_status(self) -> FullRunStatus {
match self.status {
RunStatus::Unfinished => FullRunStatus::Unfinished(UnfinishedRun {
id: self.id,
hash: self.hash,
start: self.start,
last_output: self
.output
.into_iter()
.rev()
.take(LIVE_SCROLLBACK)
.rev()
.collect(),
}),
RunStatus::Finished {
end,
exit_code,
measurements,
} => FullRunStatus::Finished(FinishedRun {
id: self.id,
hash: self.hash,
start: self.start,
end,
exit_code,
measurements,
output: self.output,
}),
RunStatus::Aborted => FullRunStatus::Aborted,
}
}
}
pub async fn run(
server_config: &'static WorkerServerConfig,
poke_tx: mpsc::UnboundedSender<()>,
run: Arc<Mutex<Run>>,
bench: BenchMethod,
abort_rx: mpsc::UnboundedReceiver<()>,
) {
async {
let result = match bench {
BenchMethod::Internal => internal::run(server_config, run.clone(), abort_rx).await,
BenchMethod::Repo { hash } => repo::run(run.clone(), hash, abort_rx).await,
};
match result {
Ok(status) => {
assert!(!matches!(status, RunStatus::Unfinished));
run.lock().unwrap().status = status;
}
Err(e) => {
error!("Error during run:\n{e:?}");
let mut guard = run.lock().unwrap();
guard.log_stderr("Internal error:".to_string());
guard.log_stderr(format!("{e:?}"));
guard.status = RunStatus::finished(-1, HashMap::new());
}
}
let _ = poke_tx.send(());
}
.instrument(debug_span!("run"))
.await;
} }

View file

@ -1,40 +1,31 @@
use std::sync::{Arc, Mutex};
use reqwest::Client; use reqwest::Client;
use tempfile::TempDir; use tempfile::TempDir;
use tracing::{debug, warn}; use tracing::{debug, warn};
use crate::{ use crate::{
config::{Config, WorkerServerConfig}, config::{Config, WorkerServerConfig},
id, shared::{FinishedRun, ServerResponse, UnfinishedRun, WorkerRequest, WorkerStatus},
shared::{FinishedRun, ServerResponse, WorkerRequest, WorkerStatus},
somehow, somehow,
worker::tree, worker::tree,
}; };
use super::run::RunInProgress;
const SCROLLBACK: usize = 50;
#[derive(Clone)] #[derive(Clone)]
pub struct Server { pub struct Server {
name: String, pub name: String,
config: &'static Config, pub config: &'static Config,
server_config: &'static WorkerServerConfig, pub server_config: &'static WorkerServerConfig,
client: Client, pub secret: String,
secret: String, pub client: Client,
pub current_run: Arc<Mutex<Option<RunInProgress>>>,
} }
impl Server { impl Server {
pub fn new(
name: String,
config: &'static Config,
server_config: &'static WorkerServerConfig,
client: Client,
) -> Self {
Self {
name,
config,
server_config,
client,
secret: id::random_worker_secret(),
}
}
// TODO Limit status requests to one in flight at a time (per server) // TODO Limit status requests to one in flight at a time (per server)
pub async fn post_status( pub async fn post_status(
&self, &self,
@ -99,8 +90,23 @@ impl Server {
async fn ping(&self) -> somehow::Result<()> { async fn ping(&self) -> somehow::Result<()> {
debug!("Pinging server"); debug!("Pinging server");
// TODO Use actual status let status = match &*self.current_run.lock().unwrap() {
let status = WorkerStatus::Idle; Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun {
run: run.run.clone(),
last_output: run
.output
.lock()
.unwrap()
.iter()
.rev()
.take(SCROLLBACK)
.rev()
.cloned()
.collect(),
}),
Some(_) => WorkerStatus::Busy,
None => WorkerStatus::Idle,
};
let response = self.post_status(status, false, None).await?; let response = self.post_status(status, false, None).await?;