diff --git a/src/runner.rs b/src/runner.rs index 18f8133..3cce79b 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,9 +1,15 @@ mod coordinator; +mod server; -use tracing::error; +use std::sync::{Arc, Mutex}; + +use tokio::task::JoinSet; +use tracing::{debug, error}; use crate::config::Config; +use self::{coordinator::Coordinator, server::Server}; + pub struct Runner { config: &'static Config, } @@ -14,6 +20,26 @@ impl Runner { } pub async fn run(&self) { - error!("Runner not yet implemented"); + if self.config.runner_servers.is_empty() { + error!("No servers specified in config"); + return; + } + + let names = self.config.runner_servers.keys().cloned().collect(); + let coordinator = Arc::new(Mutex::new(Coordinator::new(names))); + + let mut tasks = JoinSet::new(); + for (name, config) in self.config.runner_servers.iter() { + debug!("Launching task for server {name}"); + let mut server = Server::new( + name.clone(), + config, + self.config.runner_ping_delay, + coordinator.clone(), + ); + tasks.spawn(async move { server.run().await }); + } + + while tasks.join_next().await.is_some() {} } } diff --git a/src/runner/server.rs b/src/runner/server.rs new file mode 100644 index 0000000..7fe45c1 --- /dev/null +++ b/src/runner/server.rs @@ -0,0 +1,69 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use time::OffsetDateTime; +use tracing::{debug, info_span, warn, Instrument}; + +use crate::{config::RunnerServerConfig, somehow}; + +use super::coordinator::Coordinator; + +enum RunState { + Preparing, + Running, + Finished, // TODO Include run results here +} + +struct Run { + id: String, + hash: String, + start: OffsetDateTime, + state: RunState, +} + +pub struct Server { + name: String, + config: &'static RunnerServerConfig, + ping_delay: Duration, + coordinator: Arc>, + run: Option>>, +} + +impl Server { + pub fn new( + name: String, + config: &'static RunnerServerConfig, + ping_delay: Duration, + coordinator: Arc>, + ) -> Self { + Self { + name, + config, + ping_delay, + coordinator, + run: None, + } + } + + pub async fn run(&mut self) { + let name = self.name.clone(); + async { + loop { + match self.ping().await { + Ok(()) => {} + Err(e) => warn!("Error talking to server:\n{e:?}"), + } + tokio::time::sleep(self.ping_delay).await; + } + } + .instrument(info_span!("runner", name)) + .await; + } + + async fn ping(&mut self) -> somehow::Result<()> { + debug!("Pinging"); + Ok(()) + } +}