Implement /api/runner/status

This commit is contained in:
Joscha 2023-08-10 18:47:44 +02:00
parent 5c8c037417
commit c713abc5d3
11 changed files with 296 additions and 29 deletions

View file

@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT hash FROM queue ORDER BY priority DESC, unixepoch(date) DESC, hash ASC ",
"describe": {
"columns": [
{
"name": "hash",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [
false
]
},
"hash": "a1ffd52b6d9dc0c24e67db5a98f626df4efd1d2e78bc6055054cd05d80671314"
}

36
Cargo.lock generated
View file

@ -186,6 +186,7 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"bytes", "bytes",
"futures-util", "futures-util",
"headers",
"http", "http",
"http-body", "http-body",
"hyper", "hyper",
@ -251,6 +252,12 @@ dependencies = [
"rustc-demangle", "rustc-demangle",
] ]
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.2" version = "0.21.2"
@ -1557,6 +1564,31 @@ dependencies = [
"hashbrown 0.14.0", "hashbrown 0.14.0",
] ]
[[package]]
name = "headers"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584"
dependencies = [
"base64 0.13.1",
"bitflags 1.3.2",
"bytes",
"headers-core",
"http",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.1" version = "0.4.1"
@ -2653,7 +2685,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64 0.21.2",
"bitflags 2.3.3", "bitflags 2.3.3",
"byteorder", "byteorder",
"bytes", "bytes",
@ -2696,7 +2728,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64 0.21.2",
"bitflags 2.3.3", "bitflags 2.3.3",
"byteorder", "byteorder",
"crc", "crc",

View file

@ -7,7 +7,7 @@ edition = "2021"
anyhow = "1.0.72" anyhow = "1.0.72"
askama = { version = "0.12.0", features = ["with-axum"] } askama = { version = "0.12.0", features = ["with-axum"] }
askama_axum = "0.3.0" askama_axum = "0.3.0"
axum = { version = "0.6.19", features = ["macros"] } axum = { version = "0.6.19", features = ["macros", "headers"] }
clap = { version = "4.3.19", features = ["derive", "deprecated"] } clap = { version = "4.3.19", features = ["derive", "deprecated"] }
directories = "5.0.1" directories = "5.0.1"
futures = "0.3.28" futures = "0.3.28"

View file

@ -65,15 +65,15 @@ think them through.
## Runner interaction ## Runner interaction
Runner interaction happens via endpoints located at `/api/runner/`. All of these Runner interaction happens via endpoints located at `/api/runner/`. To access
are behind BASIC authentication. The username is `runner` and the password must any endpoint, the runner must use Basic authentication. The username is the name
be the server's runner token. When the runner presents the correct token, the of the runner and the password is the server's runner token. When the runner
server trusts the data the runner sends, including the name, current state, and presents the correct token, the server trusts the data the runner sends,
run ids. including the name, current state, and run ids.
On the server side, runners are identified by the runner's self-reported On the server side, runners are identified by the runner's self-reported name.
identifier. This allows more human-readable and permanent links to runners than This allows more human-readable and permanent links to runners than something
something like session ids. like session ids.
- POST `/api/runner/status` - POST `/api/runner/status`
- Main endpoint for runner/server coordination - Main endpoint for runner/server coordination

View file

@ -1,8 +1,13 @@
mod recurring; mod recurring;
mod runners;
mod util; mod util;
mod web; mod web;
use std::{path::Path, sync::Arc, time::Duration}; use std::{
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
use axum::extract::FromRef; use axum::extract::FromRef;
use gix::ThreadSafeRepository; use gix::ThreadSafeRepository;
@ -15,6 +20,8 @@ use tracing::{debug, info};
use crate::{args::ServerCommand, config::Config, somehow}; use crate::{args::ServerCommand, config::Config, somehow};
use self::runners::Runners;
async fn open_db(db_path: &Path) -> sqlx::Result<SqlitePool> { async fn open_db(db_path: &Path) -> sqlx::Result<SqlitePool> {
let options = SqliteConnectOptions::new() let options = SqliteConnectOptions::new()
// https://www.sqlite.org/pragma.html#pragma_journal_mode // https://www.sqlite.org/pragma.html#pragma_journal_mode
@ -61,6 +68,7 @@ pub struct Server {
db: SqlitePool, db: SqlitePool,
repo: Option<Repo>, repo: Option<Repo>,
bench_repo: Option<BenchRepo>, bench_repo: Option<BenchRepo>,
runners: Arc<Mutex<Runners>>,
} }
impl Server { impl Server {
@ -86,6 +94,7 @@ impl Server {
db: open_db(&command.db).await?, db: open_db(&command.db).await?,
repo, repo,
bench_repo, bench_repo,
runners: Arc::new(Mutex::new(Runners::new(config))),
}) })
} }
@ -93,7 +102,7 @@ impl Server {
if let Some(repo) = self.repo.clone() { if let Some(repo) = self.repo.clone() {
select! { select! {
e = web::run(self.clone()) => e, e = web::run(self.clone()) => e,
() = recurring::run(self.clone(), repo, self.bench_repo.clone()) => Ok(()), () = recurring::run(self.clone(), repo) => Ok(()),
} }
} else { } else {
web::run(self.clone()).await web::run(self.clone()).await

View file

@ -8,7 +8,7 @@ mod repo;
use tracing::{debug_span, error, Instrument}; use tracing::{debug_span, error, Instrument};
use super::{BenchRepo, Repo, Server}; use super::{Repo, Server};
async fn recurring_task(state: &Server, repo: Repo) { async fn recurring_task(state: &Server, repo: Repo) {
async { async {
@ -28,7 +28,7 @@ async fn recurring_task(state: &Server, repo: Repo) {
.await; .await;
} }
pub(super) async fn run(server: Server, repo: Repo, bench_repo: Option<BenchRepo>) { pub(super) async fn run(server: Server, repo: Repo) {
loop { loop {
recurring_task(&server, repo.clone()).await; recurring_task(&server, repo.clone()).await;
tokio::time::sleep(server.config.repo_update_delay).await; tokio::time::sleep(server.config.repo_update_delay).await;

84
src/server/runners.rs Normal file
View file

@ -0,0 +1,84 @@
use std::collections::HashMap;
use gix::hashtable::HashSet;
use time::OffsetDateTime;
use crate::{config::Config, shared::RunnerStatus};
pub struct RunnerInfo {
pub secret: String,
pub last_seen: OffsetDateTime,
pub status: RunnerStatus,
}
impl RunnerInfo {
pub fn new(secret: String, last_seen: OffsetDateTime, status: RunnerStatus) -> Self {
Self {
secret,
last_seen,
status,
}
}
}
pub struct Runners {
config: &'static Config,
runners: HashMap<String, RunnerInfo>,
}
impl Runners {
pub fn new(config: &'static Config) -> Self {
Self {
config,
runners: HashMap::new(),
}
}
pub fn clean(&mut self, now: OffsetDateTime) {
self.runners
.retain(|_, v| now <= v.last_seen + self.config.web_runner_timeout)
}
pub fn verify(&self, name: &str, secret: &str) -> bool {
let Some(runner) = self.runners.get(name) else { return true; };
runner.secret == secret
}
pub fn update(&mut self, name: String, info: RunnerInfo) {
self.runners.insert(name, info);
}
fn oldest_working_on(&self, hash: &str) -> Option<&str> {
self.runners
.iter()
.filter_map(|(name, info)| match &info.status {
RunnerStatus::Working { hash: h, since, .. } if h == hash => Some((name, *since)),
_ => None,
})
.max_by_key(|(_, since)| *since)
.map(|(name, _)| name as &str)
}
pub fn should_abort_work(&self, name: &str) -> bool {
let Some(info) = self.runners.get(name) else { return false; };
let RunnerStatus::Working { hash, .. } = &info.status else { return false; };
let Some(oldest) = self.oldest_working_on(hash) else { return false; };
name != oldest
}
pub fn find_free_work<'a>(&self, hashes: &'a [String]) -> Option<&'a str> {
let covered = self
.runners
.values()
.filter_map(|info| match &info.status {
RunnerStatus::Working { hash, .. } => Some(hash),
_ => None,
})
.collect::<HashSet<_>>();
hashes
.iter()
.find(|hash| !covered.contains(hash))
.map(|hash| hash as &str)
}
}

View file

@ -1,3 +1,4 @@
mod api;
mod commit; mod commit;
mod commit_hash; mod commit_hash;
mod index; mod index;
@ -47,6 +48,7 @@ pub async fn run(server: Server) -> somehow::Result<()> {
.route("/commit/:hash", get(commit_hash::get)) .route("/commit/:hash", get(commit_hash::get))
.route("/queue/", get(queue::get)) .route("/queue/", get(queue::get))
.route("/queue/table", get(queue::get_table)) .route("/queue/table", get(queue::get_table))
.merge(api::router(&server))
.fallback(get(r#static::static_handler)) .fallback(get(r#static::static_handler))
.with_state(server.clone()); .with_state(server.clone());

91
src/server/web/api.rs Normal file
View file

@ -0,0 +1,91 @@
mod auth;
use std::sync::{Arc, Mutex};
use askama_axum::{IntoResponse, Response};
use axum::{
extract::State,
headers::{authorization::Basic, Authorization},
http::StatusCode,
routing::post,
Json, Router, TypedHeader,
};
use sqlx::SqlitePool;
use time::OffsetDateTime;
use crate::{
config::Config,
server::{
runners::{RunnerInfo, Runners},
BenchRepo, Server,
},
shared::{BenchMethod, RunnerRequest, ServerResponse, Work},
somehow,
};
async fn post_status(
TypedHeader(auth): TypedHeader<Authorization<Basic>>,
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
State(bench_repo): State<Option<BenchRepo>>,
State(runners): State<Arc<Mutex<Runners>>>,
Json(request): Json<RunnerRequest>,
) -> somehow::Result<Response> {
let name = match auth::authenticate(config, auth) {
Ok(name) => name,
Err(response) => return Ok(response),
};
let now = OffsetDateTime::now_utc();
let queue = sqlx::query_scalar!(
"\
SELECT hash FROM queue \
ORDER BY priority DESC, unixepoch(date) DESC, hash ASC \
"
)
.fetch_all(&db)
.await?;
let mut guard = runners.lock().unwrap();
guard.clean(now);
if !guard.verify(&name, &request.secret) {
return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response());
}
guard.update(
name.clone(),
RunnerInfo::new(request.secret, now, request.status),
);
let work = match request.request_work {
true => guard.find_free_work(&queue),
false => None,
};
let abort_work = guard.should_abort_work(&name);
drop(guard);
// Find new work
let work = if let Some(hash) = work {
let bench = match bench_repo {
Some(bench_repo) => BenchMethod::BenchRepo {
hash: bench_repo.0.to_thread_local().head_id()?.to_string(),
},
None => BenchMethod::Internal,
};
Some(Work {
hash: hash.to_string(),
bench,
})
} else {
None
};
Ok(Json(ServerResponse { work, abort_work }).into_response())
}
pub fn router(server: &Server) -> Router<Server> {
if server.repo.is_none() {
return Router::new().route("/api/runner/status", post(post_status));
}
// TODO Add routes
Router::new()
}

View file

@ -0,0 +1,41 @@
use askama_axum::IntoResponse;
use axum::{
headers::{authorization::Basic, Authorization},
http::{header, HeaderValue, StatusCode},
response::Response,
};
use crate::config::Config;
fn is_username_valid(username: &str) -> bool {
if username.is_empty() {
return false;
}
username
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.')
}
fn is_password_valid(password: &str, config: &'static Config) -> bool {
password == config.web_runner_token
}
pub fn authenticate(
config: &'static Config,
auth: Authorization<Basic>,
) -> Result<String, Response> {
if is_username_valid(auth.username()) && is_password_valid(auth.password(), config) {
return Ok(auth.username().to_string());
}
Err((
StatusCode::UNAUTHORIZED,
[(
header::WWW_AUTHENTICATE,
HeaderValue::from_str("Basic realm=\"runner api\"").unwrap(),
)],
"invalid credentials",
)
.into_response())
}

View file

@ -58,19 +58,7 @@ pub enum RunnerStatus {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Request { pub struct RunnerRequest {
/// The runner's name.
///
/// This name is shown to the user in the UI and used to identify the runner
/// in URLs. Because of this, only these characters are allowed:
///
/// - Letters from `a` to `z`, both lowercase and uppercase
/// - Digits from `0` to `9`
/// - The hyphen `-`, underscore `_`, and dot `.` characters
///
/// Additionally, the name must be at least one character long.
pub name: String,
/// Additional free-form info about the runner. /// Additional free-form info about the runner.
/// ///
/// This could for example be used to describe the runner's system specs. /// This could for example be used to describe the runner's system specs.
@ -111,7 +99,7 @@ pub struct Work {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Response { pub struct ServerResponse {
/// Work the runner requested using [`Request::request_work]. /// Work the runner requested using [`Request::request_work].
/// ///
/// The runner may ignore this work and do something else. However, until /// The runner may ignore this work and do something else. However, until