Launch idle thread for each server

This commit is contained in:
Joscha 2023-08-12 16:27:53 +02:00
parent 4f63b02509
commit 53be0338f2
3 changed files with 107 additions and 182 deletions

View file

@ -94,26 +94,25 @@ async fn open_in_browser(config: &Config) {
}
async fn launch_local_workers(config: &'static Config, amount: u8) {
let server_name = "localhost";
let server_config = Box::leak(Box::new(WorkerServerConfig {
url: format!("http://{}{}", config.web_address, config.web_base),
token: config.web_worker_token.clone(),
}));
// Wait a bit to ensure the server is ready to serve requests.
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..amount {
let mut worker_config = config.clone();
worker_config.worker_name = format!("{}-{i}", config.worker_name);
let worker_config = Box::leak(Box::new(worker_config));
info!("Launching local worker {}", worker_config.worker_name);
worker::launch_standalone_server_task(
worker_config,
server_name.to_string(),
server_config,
let mut config = config.clone();
config.worker_name = format!("{}-{i}", config.worker_name);
config.worker_servers.clear();
config.worker_servers.insert(
"localhost".to_string(),
WorkerServerConfig {
url: format!("http://{}{}", config.web_address, config.web_base),
token: config.web_worker_token.clone(),
},
);
let config = Box::leak(Box::new(config));
info!("Launching local worker {}", config.worker_name);
let worker = Worker::new(config);
tokio::spawn(async move { worker.run().await });
}
}

View file

@ -1,8 +1,10 @@
mod server;
mod tree;
use reqwest::Client;
use tracing::error;
use crate::config::Config;
use crate::{config::Config, worker::server::Server};
pub struct Worker {
config: &'static Config,
@ -14,11 +16,33 @@ impl Worker {
}
pub async fn run(&self) {
if self.config.worker_servers.is_empty() {
error!("No servers specified in config");
return;
let client = Client::new();
let mut servers = self
.config
.worker_servers
.iter()
.map(|(name, server_config)| {
Server::new(name.clone(), self.config, server_config, client.clone())
})
.collect::<Vec<_>>();
for server in &servers {
tokio::spawn(server.clone().ping_periodically());
}
todo!()
match servers.len() {
0 => error!("No servers specified in config"),
1 => self.single_server_mode(servers.pop().unwrap()).await,
_ => self.many_server_mode(servers).await,
}
}
async fn single_server_mode(&self, server: Server) {
// TODO Implement
}
async fn many_server_mode(&self, servers: Vec<Server>) {
// TODO Implement
}
}

View file

@ -1,32 +1,22 @@
use std::sync::{Arc, Mutex};
use reqwest::Client;
use tokio::sync::mpsc;
use tracing::{debug, info_span, warn, Instrument};
use tempfile::TempDir;
use tracing::{debug, warn};
use crate::{
config::{Config, WorkerServerConfig},
id,
shared::{FinishedRun, ServerResponse, WorkerRequest, WorkerStatus},
somehow,
worker::run::{self, FullRunStatus},
};
use super::{
coordinator::{ActiveInfo, Coordinator},
run::Run,
worker::tree,
};
#[derive(Clone)]
pub struct Server {
name: String,
config: &'static Config,
server_config: &'static WorkerServerConfig,
coordinator: Arc<Mutex<Coordinator>>,
client: Client,
secret: String,
// TODO Cache bench dir
run: Option<(Arc<Mutex<Run>>, mpsc::UnboundedSender<()>)>,
}
impl Server {
@ -34,163 +24,19 @@ impl Server {
name: String,
config: &'static Config,
server_config: &'static WorkerServerConfig,
coordinator: Arc<Mutex<Coordinator>>,
client: Client,
) -> Self {
Self {
name,
config,
server_config,
coordinator,
client: Client::new(),
client,
secret: id::random_worker_secret(),
run: None,
}
}
pub async fn run(&mut self) {
// Register with coordinator
let (poke_tx, mut poke_rx) = mpsc::unbounded_channel();
self.coordinator
.lock()
.unwrap()
.register(self.name.clone(), poke_tx.clone());
// Main loop
let name = self.name.clone();
async {
loop {
match self.ping(&poke_tx).await {
Ok(()) => {}
Err(e) => warn!("Error talking to server:\n{e:?}"),
}
self.wait_until_next_ping(&mut poke_rx).await;
}
}
.instrument(info_span!("worker", name))
.await;
}
async fn wait_until_next_ping(&self, poke_rx: &mut mpsc::UnboundedReceiver<()>) {
// Wait for poke or until the ping delay elapses. If we get poked while
// pinging the server, this will not wait and we'll immediately do
// another ping.
let _ = tokio::time::timeout(self.config.worker_ping_delay, poke_rx.recv()).await;
// Empty queue in case we were poked more than once. This can happen for
// example if we get poked multiple times while pinging the server.
while poke_rx.try_recv().is_ok() {}
}
async fn ping(&mut self, poke_tx: &mpsc::UnboundedSender<()>) -> somehow::Result<()> {
debug!("Pinging server");
let info = self.coordinator.lock().unwrap().active(&self.name);
if info.active {
self.ping_active(info, poke_tx).await?;
} else {
self.ping_inactive(info).await?;
}
Ok(())
}
async fn ping_inactive(&self, info: ActiveInfo) -> somehow::Result<()> {
assert!(self.run.is_none());
let status = match info.busy {
true => WorkerStatus::Busy,
false => WorkerStatus::Idle,
};
self.request(status, false, None).await?;
Ok(())
}
async fn ping_active(
&mut self,
info: ActiveInfo,
poke_tx: &mpsc::UnboundedSender<()>,
) -> somehow::Result<()> {
let run = self
.run
.as_ref()
.map(|(r, _)| r.lock().unwrap().clone().into_full_status())
.unwrap_or(FullRunStatus::Aborted);
let unfinished = matches!(run, FullRunStatus::Unfinished(_));
let aborted = matches!(run, FullRunStatus::Aborted);
let in_batch = info.in_batch(self.config.worker_batch_duration);
let (status, submit_work) = match run {
FullRunStatus::Unfinished(run) => (WorkerStatus::Working(run), None),
FullRunStatus::Finished(run) => (WorkerStatus::Idle, Some(run)),
FullRunStatus::Aborted => (WorkerStatus::Idle, None),
};
let request_work = in_batch && !unfinished;
let response = self.request(status, request_work, submit_work).await;
if response.is_err() && aborted {
// We have nothing important going on, let's defer to the next
// server and hope this one will respond again soon.
self.coordinator
.lock()
.unwrap()
.move_to_next_server(&self.name);
// Return explicitly to ensure we don't continue to the rest of the
// function in the false belief that we're active. Oh, and don't
// swallow the error.
response?;
return Ok(());
}
let response = response?;
// Clean up self.run if we no longer need it
if !unfinished {
// We can get rid of finished runs since we just successfully sent
// the server the results.
self.run = None;
}
// Abort run if server says so
if response.abort_work {
if let Some((_, abort_tx)) = &self.run {
let _ = abort_tx.send(());
}
}
// Start work (but only if we requested it)
if let Some(work) = response.work.filter(|_| request_work) {
assert!(!unfinished);
assert!(self.run.is_none());
let run = Arc::new(Mutex::new(Run::new(work.id, work.hash)));
let (abort_tx, abort_rx) = mpsc::unbounded_channel();
self.run = Some((run.clone(), abort_tx));
self.coordinator.lock().unwrap().look_busy(&self.name);
tokio::spawn(run::run(
self.server_config,
poke_tx.clone(),
run,
work.bench,
abort_rx,
));
}
// Finally, advance to the next server if it makes sense to do so
if self.run.is_none() {
self.coordinator
.lock()
.unwrap()
.move_to_next_server(&self.name);
}
Ok(())
}
async fn request(
// TODO Limit status requests to one in flight at a time (per server)
pub async fn post_status(
&self,
status: WorkerStatus,
request_work: bool,
@ -217,4 +63,60 @@ impl Server {
Ok(response)
}
pub async fn download_repo(&self, hash: &str) -> somehow::Result<TempDir> {
let url = format!(
"{}api/worker/repo/{hash}/tree.tar.gz",
self.server_config.url
);
let response = self
.client
.get(url)
.basic_auth(&self.config.worker_name, Some(&self.server_config.token))
.send()
.await?;
tree::download(response).await
}
pub async fn download_bench_repo(&self, hash: &str) -> somehow::Result<TempDir> {
let url = format!(
"{}api/worker/bench_repo/{hash}/tree.tar.gz",
self.server_config.url
);
let response = self
.client
.get(url)
.basic_auth(&self.config.worker_name, Some(&self.server_config.token))
.send()
.await?;
tree::download(response).await
}
async fn ping(&self) -> somehow::Result<()> {
debug!("Pinging server");
// TODO Use actual status
let status = WorkerStatus::Idle;
let response = self.post_status(status, false, None).await?;
// TODO Signal that run should be aborted
Ok(())
}
pub async fn ping_periodically(self) {
loop {
match self.ping().await {
Ok(()) => {}
Err(e) => warn!("Error talking to server:\n{e:?}"),
}
tokio::time::sleep(self.config.worker_ping_delay).await;
}
}
}