From c713abc5d37805281acdfe8dcd31565177ef4eee Mon Sep 17 00:00:00 2001 From: Joscha Date: Thu, 10 Aug 2023 18:47:44 +0200 Subject: [PATCH] Implement /api/runner/status --- ...626df4efd1d2e78bc6055054cd05d80671314.json | 20 ++++ Cargo.lock | 36 +++++++- Cargo.toml | 2 +- DESIGN.md | 16 ++-- src/server.rs | 13 ++- src/server/recurring.rs | 4 +- src/server/runners.rs | 84 +++++++++++++++++ src/server/web.rs | 2 + src/server/web/api.rs | 91 +++++++++++++++++++ src/server/web/api/auth.rs | 41 +++++++++ src/shared.rs | 16 +--- 11 files changed, 296 insertions(+), 29 deletions(-) create mode 100644 .sqlx/query-a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314.json create mode 100644 src/server/runners.rs create mode 100644 src/server/web/api.rs create mode 100644 src/server/web/api/auth.rs diff --git a/.sqlx/query-a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314.json b/.sqlx/query-a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314.json new file mode 100644 index 0000000..08c1855 --- /dev/null +++ b/.sqlx/query-a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "SELECT hash FROM queue ORDER BY priority DESC, unixepoch(date) DESC, hash ASC ", + "describe": { + "columns": [ + { + "name": "hash", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false + ] + }, + "hash": "a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314" +} diff --git a/Cargo.lock b/Cargo.lock index 1827aca..682e602 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,6 +186,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", + "headers", "http", "http-body", "hyper", @@ -251,6 +252,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.2" @@ -1557,6 +1564,31 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64 0.13.1", + "bitflags 1.3.2", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -2653,7 +2685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" dependencies = [ "atoi", - "base64", + "base64 0.21.2", "bitflags 2.3.3", "byteorder", "bytes", @@ -2696,7 +2728,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" dependencies = [ "atoi", - "base64", + "base64 0.21.2", "bitflags 2.3.3", "byteorder", "crc", diff --git a/Cargo.toml b/Cargo.toml index d427ed5..8250aa6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" anyhow = "1.0.72" askama = { version = "0.12.0", features = ["with-axum"] } askama_axum = "0.3.0" -axum = { version = "0.6.19", features = ["macros"] } +axum = { version = "0.6.19", features = ["macros", "headers"] } clap = { version = "4.3.19", features = ["derive", "deprecated"] } directories = "5.0.1" futures = "0.3.28" diff --git a/DESIGN.md b/DESIGN.md index 889861c..236cc7a 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -65,15 +65,15 @@ think them through. ## Runner interaction -Runner interaction happens via endpoints located at `/api/runner/`. All of these -are behind BASIC authentication. The username is `runner` and the password must -be the server's runner token. When the runner presents the correct token, the -server trusts the data the runner sends, including the name, current state, and -run ids. +Runner interaction happens via endpoints located at `/api/runner/`. To access +any endpoint, the runner must use Basic authentication. The username is the name +of the runner and the password is the server's runner token. When the runner +presents the correct token, the server trusts the data the runner sends, +including the name, current state, and run ids. -On the server side, runners are identified by the runner's self-reported -identifier. This allows more human-readable and permanent links to runners than -something like session ids. +On the server side, runners are identified by the runner's self-reported name. +This allows more human-readable and permanent links to runners than something +like session ids. - POST `/api/runner/status` - Main endpoint for runner/server coordination diff --git a/src/server.rs b/src/server.rs index ae0553d..c11328e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,13 @@ mod recurring; +mod runners; mod util; mod web; -use std::{path::Path, sync::Arc, time::Duration}; +use std::{ + path::Path, + sync::{Arc, Mutex}, + time::Duration, +}; use axum::extract::FromRef; use gix::ThreadSafeRepository; @@ -15,6 +20,8 @@ use tracing::{debug, info}; use crate::{args::ServerCommand, config::Config, somehow}; +use self::runners::Runners; + async fn open_db(db_path: &Path) -> sqlx::Result { let options = SqliteConnectOptions::new() // https://www.sqlite.org/pragma.html#pragma_journal_mode @@ -61,6 +68,7 @@ pub struct Server { db: SqlitePool, repo: Option, bench_repo: Option, + runners: Arc>, } impl Server { @@ -86,6 +94,7 @@ impl Server { db: open_db(&command.db).await?, repo, bench_repo, + runners: Arc::new(Mutex::new(Runners::new(config))), }) } @@ -93,7 +102,7 @@ impl Server { if let Some(repo) = self.repo.clone() { select! { e = web::run(self.clone()) => e, - () = recurring::run(self.clone(), repo, self.bench_repo.clone()) => Ok(()), + () = recurring::run(self.clone(), repo) => Ok(()), } } else { web::run(self.clone()).await diff --git a/src/server/recurring.rs b/src/server/recurring.rs index 567fde8..84301ef 100644 --- a/src/server/recurring.rs +++ b/src/server/recurring.rs @@ -8,7 +8,7 @@ mod repo; use tracing::{debug_span, error, Instrument}; -use super::{BenchRepo, Repo, Server}; +use super::{Repo, Server}; async fn recurring_task(state: &Server, repo: Repo) { async { @@ -28,7 +28,7 @@ async fn recurring_task(state: &Server, repo: Repo) { .await; } -pub(super) async fn run(server: Server, repo: Repo, bench_repo: Option) { +pub(super) async fn run(server: Server, repo: Repo) { loop { recurring_task(&server, repo.clone()).await; tokio::time::sleep(server.config.repo_update_delay).await; diff --git a/src/server/runners.rs b/src/server/runners.rs new file mode 100644 index 0000000..1680efe --- /dev/null +++ b/src/server/runners.rs @@ -0,0 +1,84 @@ +use std::collections::HashMap; + +use gix::hashtable::HashSet; +use time::OffsetDateTime; + +use crate::{config::Config, shared::RunnerStatus}; + +pub struct RunnerInfo { + pub secret: String, + pub last_seen: OffsetDateTime, + pub status: RunnerStatus, +} + +impl RunnerInfo { + pub fn new(secret: String, last_seen: OffsetDateTime, status: RunnerStatus) -> Self { + Self { + secret, + last_seen, + status, + } + } +} + +pub struct Runners { + config: &'static Config, + runners: HashMap, +} + +impl Runners { + pub fn new(config: &'static Config) -> Self { + Self { + config, + runners: HashMap::new(), + } + } + + pub fn clean(&mut self, now: OffsetDateTime) { + self.runners + .retain(|_, v| now <= v.last_seen + self.config.web_runner_timeout) + } + + pub fn verify(&self, name: &str, secret: &str) -> bool { + let Some(runner) = self.runners.get(name) else { return true; }; + runner.secret == secret + } + + pub fn update(&mut self, name: String, info: RunnerInfo) { + self.runners.insert(name, info); + } + + fn oldest_working_on(&self, hash: &str) -> Option<&str> { + self.runners + .iter() + .filter_map(|(name, info)| match &info.status { + RunnerStatus::Working { hash: h, since, .. } if h == hash => Some((name, *since)), + _ => None, + }) + .max_by_key(|(_, since)| *since) + .map(|(name, _)| name as &str) + } + + pub fn should_abort_work(&self, name: &str) -> bool { + let Some(info) = self.runners.get(name) else { return false; }; + let RunnerStatus::Working { hash, .. } = &info.status else { return false; }; + let Some(oldest) = self.oldest_working_on(hash) else { return false; }; + name != oldest + } + + pub fn find_free_work<'a>(&self, hashes: &'a [String]) -> Option<&'a str> { + let covered = self + .runners + .values() + .filter_map(|info| match &info.status { + RunnerStatus::Working { hash, .. } => Some(hash), + _ => None, + }) + .collect::>(); + + hashes + .iter() + .find(|hash| !covered.contains(hash)) + .map(|hash| hash as &str) + } +} diff --git a/src/server/web.rs b/src/server/web.rs index 4b1736d..8a45b12 100644 --- a/src/server/web.rs +++ b/src/server/web.rs @@ -1,3 +1,4 @@ +mod api; mod commit; mod commit_hash; mod index; @@ -47,6 +48,7 @@ pub async fn run(server: Server) -> somehow::Result<()> { .route("/commit/:hash", get(commit_hash::get)) .route("/queue/", get(queue::get)) .route("/queue/table", get(queue::get_table)) + .merge(api::router(&server)) .fallback(get(r#static::static_handler)) .with_state(server.clone()); diff --git a/src/server/web/api.rs b/src/server/web/api.rs new file mode 100644 index 0000000..8939857 --- /dev/null +++ b/src/server/web/api.rs @@ -0,0 +1,91 @@ +mod auth; + +use std::sync::{Arc, Mutex}; + +use askama_axum::{IntoResponse, Response}; +use axum::{ + extract::State, + headers::{authorization::Basic, Authorization}, + http::StatusCode, + routing::post, + Json, Router, TypedHeader, +}; +use sqlx::SqlitePool; +use time::OffsetDateTime; + +use crate::{ + config::Config, + server::{ + runners::{RunnerInfo, Runners}, + BenchRepo, Server, + }, + shared::{BenchMethod, RunnerRequest, ServerResponse, Work}, + somehow, +}; + +async fn post_status( + TypedHeader(auth): TypedHeader>, + State(config): State<&'static Config>, + State(db): State, + State(bench_repo): State>, + State(runners): State>>, + Json(request): Json, +) -> somehow::Result { + let name = match auth::authenticate(config, auth) { + Ok(name) => name, + Err(response) => return Ok(response), + }; + + let now = OffsetDateTime::now_utc(); + let queue = sqlx::query_scalar!( + "\ + SELECT hash FROM queue \ + ORDER BY priority DESC, unixepoch(date) DESC, hash ASC \ + " + ) + .fetch_all(&db) + .await?; + + let mut guard = runners.lock().unwrap(); + guard.clean(now); + if !guard.verify(&name, &request.secret) { + return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response()); + } + guard.update( + name.clone(), + RunnerInfo::new(request.secret, now, request.status), + ); + let work = match request.request_work { + true => guard.find_free_work(&queue), + false => None, + }; + let abort_work = guard.should_abort_work(&name); + drop(guard); + + // Find new work + let work = if let Some(hash) = work { + let bench = match bench_repo { + Some(bench_repo) => BenchMethod::BenchRepo { + hash: bench_repo.0.to_thread_local().head_id()?.to_string(), + }, + None => BenchMethod::Internal, + }; + Some(Work { + hash: hash.to_string(), + bench, + }) + } else { + None + }; + + Ok(Json(ServerResponse { work, abort_work }).into_response()) +} + +pub fn router(server: &Server) -> Router { + if server.repo.is_none() { + return Router::new().route("/api/runner/status", post(post_status)); + } + + // TODO Add routes + Router::new() +} diff --git a/src/server/web/api/auth.rs b/src/server/web/api/auth.rs new file mode 100644 index 0000000..bf32fb0 --- /dev/null +++ b/src/server/web/api/auth.rs @@ -0,0 +1,41 @@ +use askama_axum::IntoResponse; +use axum::{ + headers::{authorization::Basic, Authorization}, + http::{header, HeaderValue, StatusCode}, + response::Response, +}; + +use crate::config::Config; + +fn is_username_valid(username: &str) -> bool { + if username.is_empty() { + return false; + } + + username + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.') +} + +fn is_password_valid(password: &str, config: &'static Config) -> bool { + password == config.web_runner_token +} + +pub fn authenticate( + config: &'static Config, + auth: Authorization, +) -> Result { + if is_username_valid(auth.username()) && is_password_valid(auth.password(), config) { + return Ok(auth.username().to_string()); + } + + Err(( + StatusCode::UNAUTHORIZED, + [( + header::WWW_AUTHENTICATE, + HeaderValue::from_str("Basic realm=\"runner api\"").unwrap(), + )], + "invalid credentials", + ) + .into_response()) +} diff --git a/src/shared.rs b/src/shared.rs index a5112f8..20acd8c 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -58,19 +58,7 @@ pub enum RunnerStatus { } #[derive(Serialize, Deserialize)] -pub struct Request { - /// The runner's name. - /// - /// This name is shown to the user in the UI and used to identify the runner - /// in URLs. Because of this, only these characters are allowed: - /// - /// - Letters from `a` to `z`, both lowercase and uppercase - /// - Digits from `0` to `9` - /// - The hyphen `-`, underscore `_`, and dot `.` characters - /// - /// Additionally, the name must be at least one character long. - pub name: String, - +pub struct RunnerRequest { /// Additional free-form info about the runner. /// /// This could for example be used to describe the runner's system specs. @@ -111,7 +99,7 @@ pub struct Work { } #[derive(Serialize, Deserialize)] -pub struct Response { +pub struct ServerResponse { /// Work the runner requested using [`Request::request_work]. /// /// The runner may ignore this work and do something else. However, until