Ask servers for runs and perform them
This commit is contained in:
parent
eaca373a6a
commit
b7c0443005
3 changed files with 164 additions and 32 deletions
|
|
@ -5,10 +5,18 @@ mod tree;
|
|||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use reqwest::Client;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::{config::Config, id, worker::server::Server};
|
||||
use crate::{
|
||||
config::Config,
|
||||
id,
|
||||
shared::{FinishedRun, Run},
|
||||
worker::server::Server,
|
||||
};
|
||||
|
||||
use self::run::RunInProgress;
|
||||
|
||||
pub struct Worker {
|
||||
config: &'static Config,
|
||||
|
|
@ -51,10 +59,69 @@ impl Worker {
|
|||
}
|
||||
|
||||
async fn single_server_mode(&self, server: Server) {
|
||||
// TODO Implement
|
||||
loop {
|
||||
while self.perform_run(&server).await {}
|
||||
tokio::time::sleep(self.config.worker_ping_delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn many_server_mode(&self, servers: Vec<Server>) {
|
||||
// TODO Implement
|
||||
loop {
|
||||
for server in &servers {
|
||||
let batch_start = OffsetDateTime::now_utc();
|
||||
let batch_end = batch_start + self.config.worker_batch_duration;
|
||||
while OffsetDateTime::now_utc() <= batch_end {
|
||||
if !self.perform_run(server).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(self.config.worker_ping_delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Ask a server for a run, do the run, send results to the server.
|
||||
///
|
||||
/// Returns whether a run was performed.
|
||||
async fn perform_run(&self, server: &Server) -> bool {
|
||||
// Request run
|
||||
let guard = server.status_lock.lock().await;
|
||||
let Some(run) = self.request_run(server).await else { return false; };
|
||||
let run = RunInProgress::new(server.name.clone(), run);
|
||||
*server.current_run.lock().unwrap() = Some(run.clone());
|
||||
drop(guard);
|
||||
|
||||
// Perform run
|
||||
let Some(run) = run.perform().await else { return false; };
|
||||
|
||||
// Submit run
|
||||
let guard = server.status_lock.lock().await;
|
||||
*server.current_run.lock().unwrap() = None;
|
||||
while !self.submit_run(server, run.clone()).await {
|
||||
tokio::time::sleep(self.config.worker_ping_delay).await;
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
async fn request_run(&self, server: &Server) -> Option<Run> {
|
||||
match server.post_status(true, None).await {
|
||||
Ok(response) => response.run,
|
||||
Err(e) => {
|
||||
warn!("Error requesting run:\n{e:?}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn submit_run(&self, server: &Server, run: FinishedRun) -> bool {
|
||||
match server.post_status(false, Some(run)).await {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
warn!("Error submitting run:\n{e:?}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,77 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Notify;
|
||||
use tracing::error;
|
||||
|
||||
use crate::shared::{Run, Source};
|
||||
use crate::{
|
||||
shared::{BenchMethod, FinishedRun, Measurement, Run, Source},
|
||||
somehow,
|
||||
};
|
||||
|
||||
struct Finished {
|
||||
exit_code: i32,
|
||||
measurements: HashMap<String, Measurement>,
|
||||
}
|
||||
|
||||
// TODO Make fields private
|
||||
#[derive(Clone)]
|
||||
pub struct RunInProgress {
|
||||
pub server_name: String,
|
||||
pub run: Run,
|
||||
pub output: Arc<Mutex<Vec<(Source, String)>>>,
|
||||
pub abort: mpsc::UnboundedSender<()>,
|
||||
pub abort: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl RunInProgress {
|
||||
pub fn new(server_name: String, run: Run) -> Self {
|
||||
Self {
|
||||
server_name,
|
||||
run,
|
||||
output: Arc::new(Mutex::new(vec![])),
|
||||
abort: Arc::new(Notify::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn log_stdout(&self, line: String) {
|
||||
self.output.lock().unwrap().push((Source::Stdout, line));
|
||||
}
|
||||
|
||||
pub fn log_stderr(&self, line: String) {
|
||||
self.output.lock().unwrap().push((Source::Stderr, line));
|
||||
}
|
||||
|
||||
pub async fn perform(&self) -> Option<FinishedRun> {
|
||||
// TODO Remove type annotations
|
||||
// TODO Handle aborts
|
||||
let result: somehow::Result<_> = match &self.run.bench_method {
|
||||
BenchMethod::Internal => todo!(),
|
||||
BenchMethod::Repo { hash } => todo!(),
|
||||
};
|
||||
|
||||
let finished = match result {
|
||||
Ok(outcome) => outcome,
|
||||
Err(e) => {
|
||||
error!("Error during run:\n{e:?}");
|
||||
self.log_stderr("Internal error:".to_string());
|
||||
self.log_stderr(format!("{e:?}"));
|
||||
Some(Finished {
|
||||
exit_code: -1,
|
||||
measurements: HashMap::new(),
|
||||
})
|
||||
}
|
||||
}?;
|
||||
|
||||
let mut output = vec![];
|
||||
std::mem::swap(&mut output, &mut *self.output.lock().unwrap());
|
||||
|
||||
Some(FinishedRun {
|
||||
run: self.run.clone(),
|
||||
exit_code: finished.exit_code,
|
||||
output,
|
||||
measurements: finished.measurements,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,17 +45,35 @@ impl Server {
|
|||
// TODO Limit status requests to one in flight at a time (per server)
|
||||
pub async fn post_status(
|
||||
&self,
|
||||
status: WorkerStatus,
|
||||
request_work: bool,
|
||||
submit_work: Option<FinishedRun>,
|
||||
request_run: bool,
|
||||
submit_run: Option<FinishedRun>,
|
||||
) -> somehow::Result<ServerResponse> {
|
||||
let url = format!("{}api/worker/status", self.server_config.url);
|
||||
|
||||
let status = match &*self.current_run.lock().unwrap() {
|
||||
Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun {
|
||||
run: run.run.clone(),
|
||||
last_output: run
|
||||
.output
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.rev()
|
||||
.take(SCROLLBACK)
|
||||
.rev()
|
||||
.cloned()
|
||||
.collect(),
|
||||
}),
|
||||
Some(_) => WorkerStatus::Busy,
|
||||
None => WorkerStatus::Idle,
|
||||
};
|
||||
|
||||
let request = WorkerRequest {
|
||||
info: None,
|
||||
secret: self.secret.clone(),
|
||||
status,
|
||||
request_run: request_work,
|
||||
submit_run: submit_work,
|
||||
request_run,
|
||||
submit_run,
|
||||
};
|
||||
|
||||
let response = self
|
||||
|
|
@ -107,25 +125,7 @@ impl Server {
|
|||
debug!("Pinging server");
|
||||
let guard = self.status_lock.lock().await;
|
||||
|
||||
let status = match &*self.current_run.lock().unwrap() {
|
||||
Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun {
|
||||
run: run.run.clone(),
|
||||
last_output: run
|
||||
.output
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.rev()
|
||||
.take(SCROLLBACK)
|
||||
.rev()
|
||||
.cloned()
|
||||
.collect(),
|
||||
}),
|
||||
Some(_) => WorkerStatus::Busy,
|
||||
None => WorkerStatus::Idle,
|
||||
};
|
||||
|
||||
let response = self.post_status(status, false, None).await?;
|
||||
let response = self.post_status(false, None).await?;
|
||||
|
||||
// TODO Signal that run should be aborted
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue