Prevent status update race condition
This commit is contained in:
parent
e644f2be65
commit
eaca373a6a
2 changed files with 20 additions and 0 deletions
|
|
@ -5,6 +5,7 @@ mod tree;
|
|||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use reqwest::Client;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::{config::Config, id, worker::server::Server};
|
||||
|
|
@ -33,6 +34,7 @@ impl Worker {
|
|||
secret: id::random_worker_secret(),
|
||||
client: client.clone(),
|
||||
current_run: current_run.clone(),
|
||||
status_lock: Arc::new(AsyncMutex::new(())),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::sync::{Arc, Mutex};
|
|||
|
||||
use reqwest::Client;
|
||||
use tempfile::TempDir;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::{
|
||||
|
|
@ -21,8 +22,23 @@ pub struct Server {
|
|||
pub config: &'static Config,
|
||||
pub server_config: &'static WorkerServerConfig,
|
||||
pub secret: String,
|
||||
|
||||
pub client: Client,
|
||||
pub current_run: Arc<Mutex<Option<RunInProgress>>>,
|
||||
|
||||
/// You must hold this lock while sending status updates to the server and
|
||||
/// while processing the response.
|
||||
///
|
||||
/// This lock prevents the following race condition that would lead to
|
||||
/// multiple runners receiving runs for the same commit in unlucky
|
||||
/// circumstances:
|
||||
///
|
||||
/// 1. The main task requests a run
|
||||
/// 2. The ping task sends a status update where the worker is idle
|
||||
/// 3. The server receives 1, reserves a run and replies
|
||||
/// 4. The server receives 2 and clears the reservatio
|
||||
/// 5. Another worker requests a run before this worker's next ping
|
||||
pub status_lock: Arc<AsyncMutex<()>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
|
|
@ -89,6 +105,7 @@ impl Server {
|
|||
|
||||
async fn ping(&self) -> somehow::Result<()> {
|
||||
debug!("Pinging server");
|
||||
let guard = self.status_lock.lock().await;
|
||||
|
||||
let status = match &*self.current_run.lock().unwrap() {
|
||||
Some(run) if run.server_name == self.name => WorkerStatus::Working(UnfinishedRun {
|
||||
|
|
@ -112,6 +129,7 @@ impl Server {
|
|||
|
||||
// TODO Signal that run should be aborted
|
||||
|
||||
drop(guard);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue