From 437be12f144b1d5bbb444ef8e3a61e09c3ca3608 Mon Sep 17 00:00:00 2001 From: Joscha Date: Mon, 13 May 2024 17:02:45 +0200 Subject: [PATCH] Abort run when signaled to by server --- src/worker/run.rs | 23 +++++++++++++++++------ src/worker/server.rs | 16 ++++++++++------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/worker/run.rs b/src/worker/run.rs index 5843463..622ec5e 100644 --- a/src/worker/run.rs +++ b/src/worker/run.rs @@ -5,8 +5,8 @@ use std::{ sync::{Arc, Mutex}, }; -use log::error; -use tokio::sync::Notify; +use log::{error, warn}; +use tokio::{select, sync::Notify}; use crate::{ config::WorkerServerConfig, @@ -82,12 +82,19 @@ impl RunInProgress { pub async fn perform(&self, server: &Server) -> Option { // TODO Log system info - // TODO Handle aborts - let result = match &self.run.bench_method { + + let run_future = match &self.run.bench_method { BenchMethod::Internal => self.perform_internal(server), BenchMethod::Repo { hash } => todo!(), - } - .await; + }; + + let result = select! { + result = run_future => result, + _ = self.abort.notified() => { + warn!("Run for {} was aborted", server.name); + Ok(None) + }, + }; let run = match result { Ok(outcome) => outcome, @@ -116,4 +123,8 @@ impl RunInProgress { measurements: run.measurements, }) } + + pub fn abort(&self) { + self.abort.notify_one(); + } } diff --git a/src/worker/server.rs b/src/worker/server.rs index df4add6..5df9fa0 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -38,7 +38,7 @@ pub struct Server { /// 1. The main task requests a run /// 2. The ping task sends a status update where the worker is idle /// 3. The server receives 1, reserves a run and replies - /// 4. The server receives 2 and clears the reservatio + /// 4. The server receives 2 and clears the reservation /// 5. Another worker requests a run before this worker's next ping pub status_lock: Arc>, } @@ -80,6 +80,12 @@ impl Server { .json::() .await?; + if response.abort_run { + if let Some(current_run) = &*self.current_run.lock().unwrap() { + current_run.abort(); + } + } + Ok(response) } @@ -123,13 +129,11 @@ impl Server { async fn ping(&self) -> somehow::Result<()> { debug!("Pinging {}", self.name); + let guard = self.status_lock.lock().await; - - let response = self.post_status(false, None).await?; - - // TODO Signal that run should be aborted - + let _ = self.post_status(false, None).await?; drop(guard); + Ok(()) }