diff --git a/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json b/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json deleted file mode 100644 index 53c6f0c..0000000 --- a/.sqlx/query-40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "INSERT INTO runs ( id, hash, start, end, exit_code ) VALUES (?, ?, ?, ?, ?) ", - "describe": { - "columns": [], - "parameters": { - "Right": 5 - }, - "nullable": [] - }, - "hash": "40f965913a8a3ec16da66dd79c12710279ba817bc5f50661f592371297efd651" -} diff --git a/.sqlx/query-d1e7da8b6a2018e621e3fd6d7a74668a82fabb1d83bdfc8bf763bff733b3388c.json b/.sqlx/query-d1e7da8b6a2018e621e3fd6d7a74668a82fabb1d83bdfc8bf763bff733b3388c.json new file mode 100644 index 0000000..f6b891f --- /dev/null +++ b/.sqlx/query-d1e7da8b6a2018e621e3fd6d7a74668a82fabb1d83bdfc8bf763bff733b3388c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO runs ( id, hash, bench_method, start, end, exit_code ) VALUES (?, ?, ?, ?, ?, ?) ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "d1e7da8b6a2018e621e3fd6d7a74668a82fabb1d83bdfc8bf763bff733b3388c" +} diff --git a/migrations/20230809141314_runs.sql b/migrations/20230809141314_runs.sql index f58ec65..a123830 100644 --- a/migrations/20230809141314_runs.sql +++ b/migrations/20230809141314_runs.sql @@ -1,9 +1,10 @@ CREATE TABLE runs ( - id TEXT NOT NULL PRIMARY KEY, - hash TEXT NOT NULL, - start TEXT NOT NULL, - end TEXT NOT NULL, - exit_code INT NOT NULL, + id TEXT NOT NULL PRIMARY KEY, + hash TEXT NOT NULL, + bench_method TEXT NOT NULL, + start TEXT NOT NULL, + end TEXT NOT NULL, + exit_code INT NOT NULL, FOREIGN KEY (hash) REFERENCES commits (hash) ON DELETE CASCADE ) STRICT; diff --git a/src/server/web/api.rs b/src/server/web/api.rs index 77fe33b..5d04b4e 100644 --- a/src/server/web/api.rs +++ b/src/server/web/api.rs @@ -28,31 +28,39 @@ use crate::{ somehow, }; -async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { +async fn save_work(finished: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { let mut tx = db.begin().await?; let conn = tx.acquire().await?; + let end = OffsetDateTime::now_utc(); + let bench_method = match finished.run.bench_method { + BenchMethod::Internal => "internal".to_string(), + BenchMethod::Repo { hash } => format!("bench repo, hash {hash}"), + }; + sqlx::query!( "\ INSERT INTO runs ( \ id, \ hash, \ + bench_method, \ start, \ end, \ exit_code \ ) \ - VALUES (?, ?, ?, ?, ?) \ + VALUES (?, ?, ?, ?, ?, ?) \ ", - run.id, - run.hash, - run.start, - run.end, - run.exit_code, + finished.run.id, + finished.run.hash, + bench_method, + finished.run.start, + end, + finished.exit_code, ) .execute(&mut *conn) .await?; - for (name, measurement) in run.measurements { + for (name, measurement) in finished.measurements { sqlx::query!( "\ INSERT INTO run_measurements ( \ @@ -65,7 +73,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { ) \ VALUES (?, ?, ?, ?, ?, ?) \ ", - run.id, + finished.run.id, name, measurement.value, measurement.stddev, @@ -76,7 +84,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { .await?; } - for (idx, (source, text)) in run.output.into_iter().enumerate() { + for (idx, (source, text)) in finished.output.into_iter().enumerate() { // Hopefully we won't need more than 4294967296 output chunks per run :P let idx = idx as u32; sqlx::query!( @@ -89,7 +97,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { ) \ VALUES (?, ?, ?, ?) \ ", - run.id, + finished.run.id, idx, source, text, @@ -99,7 +107,7 @@ async fn save_work(run: FinishedRun, db: &SqlitePool) -> somehow::Result<()> { } // The thing has been done :D - sqlx::query!("DELETE FROM queue WHERE hash = ?", run.hash) + sqlx::query!("DELETE FROM queue WHERE hash = ?", finished.run.hash) .execute(&mut *conn) .await?; @@ -120,7 +128,7 @@ async fn post_status( Err(response) => return Ok(response), }; - if let Some(run) = request.submit_work { + if let Some(run) = request.submit_run { save_work(run, &db).await?; } @@ -153,8 +161,8 @@ async fn post_status( name.clone(), WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status), ); - let work = match request.request_work { - true => guard.find_work(&name, &queue, bench_method), + let work = match request.request_run { + true => guard.find_and_reserve_run(&name, &queue, bench_method), false => None, }; let abort_work = guard.should_abort_work(&name, &queue); @@ -162,7 +170,11 @@ async fn post_status( }; debug!("Received status update from {name}"); - Ok(Json(ServerResponse { work, abort_work }).into_response()) + Ok(Json(ServerResponse { + run: work, + abort_run: abort_work, + }) + .into_response()) } fn stream_response(repo: Arc, id: ObjectId) -> impl IntoResponse { diff --git a/src/server/web/queue.rs b/src/server/web/queue.rs index a198031..ac07e5a 100644 --- a/src/server/web/queue.rs +++ b/src/server/web/queue.rs @@ -64,12 +64,19 @@ async fn get_workers( let status = match &info.status { WorkerStatus::Idle => Status::Idle, WorkerStatus::Busy => Status::Busy, - WorkerStatus::Working(run) => { - let message = - sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash) - .fetch_one(db) - .await?; - Status::Working(RunLink::new(base, run.id.clone(), &run.hash, &message)) + WorkerStatus::Working(unfinished) => { + let message = sqlx::query_scalar!( + "SELECT message FROM commits WHERE hash = ?", + unfinished.run.hash + ) + .fetch_one(db) + .await?; + Status::Working(RunLink::new( + base, + unfinished.run.id.clone(), + &unfinished.run.hash, + &message, + )) } }; @@ -89,9 +96,9 @@ async fn get_queue( // Group workers by commit hash let mut workers_by_commit: HashMap> = HashMap::new(); for (name, info) in workers { - if let WorkerStatus::Working(run) = &info.status { + if let WorkerStatus::Working(unfinished) = &info.status { workers_by_commit - .entry(run.hash.clone()) + .entry(unfinished.run.hash.clone()) .or_default() .push(WorkerLink::new(base, name.clone())); } diff --git a/src/server/workers.rs b/src/server/workers.rs index b3410e0..386c971 100644 --- a/src/server/workers.rs +++ b/src/server/workers.rs @@ -5,7 +5,7 @@ use time::OffsetDateTime; use crate::{ config::Config, id, - shared::{BenchMethod, UnfinishedRun, Work, WorkerStatus}, + shared::{BenchMethod, Run, UnfinishedRun, WorkerStatus}, }; #[derive(Clone)] @@ -54,42 +54,49 @@ impl Workers { self.workers.insert(name, info); } - /// Find and reserve work for a worker. - pub fn find_work(&mut self, name: &str, queue: &[String], bench: BenchMethod) -> Option { + pub fn find_and_reserve_run( + &mut self, + name: &str, + queue: &[String], + bench_method: BenchMethod, + ) -> Option { let covered = self .workers .values() .filter_map(|info| match &info.status { WorkerStatus::Idle | WorkerStatus::Busy => None, - WorkerStatus::Working(run) => Some(&run.hash), + WorkerStatus::Working(unfinished) => Some(&unfinished.run.hash), }) .collect::>(); // Find work not already covered by another worker let hash = queue.iter().find(|hash| !covered.contains(hash))?.clone(); let id = id::random_run_id(); - let work = Work { id, hash, bench }; + let run = Run { + id, + hash, + bench_method, + start: OffsetDateTime::now_utc(), + }; // Reserve work so other workers don't choose it if let Some(info) = self.workers.get_mut(name) { info.status = WorkerStatus::Working(UnfinishedRun { - id: work.id.clone(), - hash: work.hash.clone(), - start: OffsetDateTime::now_utc(), + run: run.clone(), last_output: vec![], }); } - Some(work) + Some(run) } pub fn should_abort_work(&self, name: &str, queue: &[String]) -> bool { // A runner should abort work if... let Some(info) = self.workers.get(name) else { return false; }; - let WorkerStatus::Working (run) = &info.status else { return false; }; + let WorkerStatus::Working (unfinished) = &info.status else { return false; }; // The commit isn't in the queue - if !queue.contains(&run.hash) { + if !queue.contains(&unfinished.run.hash) { return true; } @@ -98,7 +105,9 @@ impl Workers { .workers .iter() .filter_map(|(name, info)| match &info.status { - WorkerStatus::Working(r) if r.hash == run.hash => Some((name, r.start)), + WorkerStatus::Working(u) if u.run.hash == unfinished.run.hash => { + Some((name, u.run.start)) + } _ => None, }) .max_by_key(|(_, start)| *start) diff --git a/src/shared.rs b/src/shared.rs index 1d4d857..7fb751e 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -10,6 +10,14 @@ fn is_false(b: &bool) -> bool { !b } +#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)] +#[repr(u8)] +pub enum Source { + // Stdin would be fd 0 + Stdout = 1, + Stderr = 2, +} + #[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)] #[repr(i8)] pub enum Direction { @@ -29,34 +37,40 @@ pub struct Measurement { pub direction: Option, } -#[derive(Clone, Serialize_repr, Deserialize_repr, sqlx::Type)] -#[repr(u8)] -pub enum Source { - // Stdin would be fd 0 - Stdout = 1, - Stderr = 2, +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type")] +pub enum BenchMethod { + Internal, + Repo { hash: String }, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct Run { + pub id: String, + pub hash: String, + pub bench_method: BenchMethod, + pub start: OffsetDateTime, } #[derive(Clone, Serialize, Deserialize)] pub struct UnfinishedRun { - pub id: String, - pub hash: String, - pub start: OffsetDateTime, + #[serde(flatten)] + pub run: Run, #[serde(default)] pub last_output: Vec<(Source, String)>, } #[derive(Clone, Serialize, Deserialize)] pub struct FinishedRun { - pub id: String, - pub hash: String, - pub start: OffsetDateTime, - pub end: OffsetDateTime, + #[serde(flatten)] + pub run: Run, #[serde(default)] pub exit_code: i32, - pub measurements: HashMap, #[serde(default)] pub output: Vec<(Source, String)>, + #[serde(default)] + pub measurements: HashMap, } #[derive(Clone, Serialize, Deserialize)] @@ -84,48 +98,28 @@ pub struct WorkerRequest { /// What the worker is currently working on. pub status: WorkerStatus, - /// Whether the worker wants new work from the server. + /// The worker wants a new run from the server. /// /// If the server has a commit available, it should respond with a non-null - /// [`Response::work`]. + /// [`ServerResponse::work`]. #[serde(default, skip_serializing_if = "is_false")] - pub request_work: bool, + pub request_run: bool, /// The worker has finished a run and wants to submit the results. #[serde(skip_serializing_if = "Option::is_none")] - pub submit_work: Option, -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "type")] -pub enum BenchMethod { - /// Use internal (deterministic) benchmarking code. - Internal, - /// Use a commit from a bench repo. - Repo { hash: String }, -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Work { - /// Id of the run. - pub id: String, - /// Hash of commit to benchmark. - pub hash: String, - /// How to benchmark the commit. - pub bench: BenchMethod, + pub submit_run: Option, } #[derive(Serialize, Deserialize)] pub struct ServerResponse { - /// Work the worker requested using [`Request::request_work]. + /// Run the worker requested using [`RunnerRequest::request_run`]. /// - /// The worker may ignore this work and do something else. However, until - /// the next update request sent by the worker, the server will consider the + /// The worker may ignore this run and do something else. However, until the + /// next update request sent by the worker, the server will consider the /// worker as preparing to work on the commit, and will not give out the /// same commit to other workers. #[serde(skip_serializing_if = "Option::is_none")] - pub work: Option, + pub run: Option, /// The worker should abort the current run. /// @@ -133,5 +127,5 @@ pub struct ServerResponse { /// the same commit as another worker and has broken the tie in favor of the /// other worker. The worker may continue the run despite this flag. #[serde(default, skip_serializing_if = "is_false")] - pub abort_work: bool, + pub abort_run: bool, } diff --git a/src/worker/server.rs b/src/worker/server.rs index 1fcede5..0c36b13 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -47,8 +47,8 @@ impl Server { info: None, secret: self.secret.clone(), status, - request_work, - submit_work, + request_run: request_work, + submit_run: submit_work, }; let response = self