diff --git a/src/worker.rs b/src/worker.rs index c957f95..cad0f1d 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -5,6 +5,7 @@ mod tree; use std::sync::{Arc, Mutex}; use reqwest::Client; +use tokio::sync::Mutex as AsyncMutex; use tracing::{error, info}; use crate::{config::Config, id, worker::server::Server}; @@ -33,6 +34,7 @@ impl Worker { secret: id::random_worker_secret(), client: client.clone(), current_run: current_run.clone(), + status_lock: Arc::new(AsyncMutex::new(())), }) .collect::>(); diff --git a/src/worker/server.rs b/src/worker/server.rs index 4208dc9..dfc0169 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, Mutex}; use reqwest::Client; use tempfile::TempDir; +use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, warn}; use crate::{ @@ -21,8 +22,23 @@ pub struct Server { pub config: &'static Config, pub server_config: &'static WorkerServerConfig, pub secret: String, + pub client: Client, pub current_run: Arc>>, + + /// You must hold this lock while sending status updates to the server and + /// while processing the response. + /// + /// This lock prevents the following race condition that would lead to + /// multiple runners receiving runs for the same commit in unlucky + /// circumstances: + /// + /// 1. The main task requests a run + /// 2. The ping task sends a status update where the worker is idle + /// 3. The server receives 1, reserves a run and replies + /// 4. The server receives 2 and clears the reservatio + /// 5. Another worker requests a run before this worker's next ping + pub status_lock: Arc>, } impl Server { @@ -89,6 +105,7 @@ impl Server { async fn ping(&self) -> somehow::Result<()> { debug!("Pinging server"); + let guard = self.status_lock.lock().await; let status = match &*self.current_run.lock().unwrap() { Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun { @@ -112,6 +129,7 @@ impl Server { // TODO Signal that run should be aborted + drop(guard); Ok(()) }