Poke server tasks in coordinator

This commit is contained in:
Joscha 2023-08-11 01:11:53 +02:00
parent 7911a67906
commit 33607779b0
3 changed files with 82 additions and 41 deletions

View file

@ -25,16 +25,15 @@ impl Runner {
return; return;
} }
let names = self.config.runner_servers.keys().cloned().collect(); let coordinator = Arc::new(Mutex::new(Coordinator::new()));
let coordinator = Arc::new(Mutex::new(Coordinator::new(names)));
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
for (name, config) in self.config.runner_servers.iter() { for (name, server_config) in self.config.runner_servers.iter() {
debug!("Launching task for server {name}"); debug!("Launching task for server {name}");
let mut server = Server::new( let mut server = Server::new(
name.clone(), name.clone(),
config, self.config,
self.config.runner_ping_delay, server_config,
coordinator.clone(), coordinator.clone(),
); );
tasks.spawn(async move { server.run().await }); tasks.spawn(async move { server.run().await });

View file

@ -1,26 +1,68 @@
//! Coordinate performing runs across servers. //! Coordinate performing runs across servers.
use tokio::sync::mpsc;
struct Server {
name: String,
poke: mpsc::UnboundedSender<()>,
}
pub struct Coordinator { pub struct Coordinator {
names: Vec<String>, servers: Vec<Server>,
current: usize, current: usize,
} }
impl Coordinator { impl Coordinator {
pub fn new(mut names: Vec<String>) -> Self { pub fn new() -> Self {
assert!(!names.is_empty()); Self {
names.sort_unstable(); servers: vec![],
Self { names, current: 0 } current: 0,
}
}
pub fn register(&mut self, name: String, poke: mpsc::UnboundedSender<()>) {
self.servers.push(Server { name, poke });
} }
pub fn active(&self, name: &str) -> bool { pub fn active(&self, name: &str) -> bool {
self.names[self.current] == name if let Some(current) = self.servers.get(self.current) {
name == current.name
} else {
false
}
} }
pub fn next(&mut self, name: &str) { pub fn next(&mut self, name: &str) {
// Check just to prevent weird shenanigans // Check just to prevent weird shenanigans
if self.active(name) { if !self.active(name) {
self.current += 1; return;
self.current %= self.names.len(); }
// At least one server (the current one) must be registered according to
// the previous check
assert!(!self.servers.is_empty());
self.current += 1;
self.current %= self.servers.len();
// When the runner seeks work and a queue is idle, the next server
// should be queried immediately. Otherwise, we'd introduce lots of
// delay in the multi-server case were most queues are empty.
//
// However, if all server's queues were empty, this would generate a
// slippery cycle of requests that the runner sends as quickly as
// possible, only limited by the roundtrip time. Because we don't want
// this, we let the first task wait its full timeout. Effectively, this
// results in iterations starting at least the ping delay apart, which
// is pretty much what we want.
//
// The way this is implemented currently is sub-optimal however: If the
// chain takes even a fraction longer than the previous iteration, tasks
// will send two requests back-to-back: The first because their ping
// timeout ran out, and the second because they were poked. So far, I
// haven't been able to think of an elegant solution for this.
if self.current > 0 {
let _ = self.servers[self.current].poke.send(());
} }
} }
} }

View file

@ -1,53 +1,44 @@
use std::{ use std::sync::{Arc, Mutex};
sync::{Arc, Mutex},
time::Duration,
};
use time::OffsetDateTime; use tokio::sync::mpsc;
use tracing::{debug, info_span, warn, Instrument}; use tracing::{debug, info_span, warn, Instrument};
use crate::{config::RunnerServerConfig, somehow}; use crate::{
config::{Config, RunnerServerConfig},
somehow,
};
use super::coordinator::Coordinator; use super::coordinator::Coordinator;
enum RunState {
Preparing,
Running,
Finished, // TODO Include run results here
}
struct Run {
id: String,
hash: String,
start: OffsetDateTime,
state: RunState,
}
pub struct Server { pub struct Server {
name: String, name: String,
config: &'static RunnerServerConfig, config: &'static Config,
ping_delay: Duration, server_config: &'static RunnerServerConfig,
coordinator: Arc<Mutex<Coordinator>>, coordinator: Arc<Mutex<Coordinator>>,
run: Option<Arc<Mutex<Run>>>,
} }
impl Server { impl Server {
pub fn new( pub fn new(
name: String, name: String,
config: &'static RunnerServerConfig, config: &'static Config,
ping_delay: Duration, server_config: &'static RunnerServerConfig,
coordinator: Arc<Mutex<Coordinator>>, coordinator: Arc<Mutex<Coordinator>>,
) -> Self { ) -> Self {
Self { Self {
name, name,
config, config,
ping_delay, server_config,
coordinator, coordinator,
run: None,
} }
} }
pub async fn run(&mut self) { pub async fn run(&mut self) {
let (poke_tx, mut poke_rx) = mpsc::unbounded_channel();
self.coordinator
.lock()
.unwrap()
.register(self.name.clone(), poke_tx.clone());
let name = self.name.clone(); let name = self.name.clone();
async { async {
loop { loop {
@ -55,7 +46,16 @@ impl Server {
Ok(()) => {} Ok(()) => {}
Err(e) => warn!("Error talking to server:\n{e:?}"), Err(e) => warn!("Error talking to server:\n{e:?}"),
} }
tokio::time::sleep(self.ping_delay).await;
// Wait for poke or until the ping delay elapses. If we get
// poked while pinging the server, this will not wait and we'll
// immediately do another ping.
let _ = tokio::time::timeout(self.config.runner_ping_delay, poke_rx.recv()).await;
// Empty queue in case we were poked more than once. This can
// happen for example if we get poked multiple times while
// pinging the server.
while poke_rx.try_recv().is_ok() {}
} }
} }
.instrument(info_span!("runner", name)) .instrument(info_span!("runner", name))