Rename runners to workers

This commit is contained in:
Joscha 2023-08-11 02:03:32 +02:00
parent 78f945647c
commit 6f4793bcf2
20 changed files with 233 additions and 237 deletions

View file

@ -16,9 +16,9 @@ think them through.
- Runs and their measurements
- Queue of commits
- The in-memory state also contains...
- Connected runners and their state
- Connected workers and their state
- From this follows the list of in-progress runs
- Runners...
- Workers...
- Should be robust
- Noone wants to lose a run a few hours in, for any reason
- Explicitly design for loss of connection, server restarts
@ -43,7 +43,7 @@ think them through.
- Change scope interactively
- Change metrics interactively
- GET `/queue/`
- List of runners and their state
- List of workers and their state
- List of unfinished runs
- "What's the state of the infrastructure?"
- GET `/commit/<hash>/`
@ -63,46 +63,46 @@ think them through.
- Show changes from rid2 to rid1
- Resolve refs, branch names and commits to their latest runs -> redirect
## Runner interaction
## Worker interaction
Runner interaction happens via endpoints located at `/api/runner/`. To access
any endpoint, the runner must use Basic authentication. The username is the name
of the runner and the password is the server's runner token. When the runner
presents the correct token, the server trusts the data the runner sends,
Worker interaction happens via endpoints located at `/api/worker/`. To access
any endpoint, the worker must use Basic authentication. The username is the name
of the worker and the password is the server's worker token. When the worker
presents the correct token, the server trusts the data the worker sends,
including the name, current state, and run ids.
On the server side, runners are identified by the runner's self-reported name.
This allows more human-readable and permanent links to runners than something
On the server side, workers are identified by the worker's self-reported name.
This allows more human-readable and permanent links to workers than something
like session ids.
- POST `/api/runner/status`
- Main endpoint for runner/server coordination
- Runner periodically sends current status to server
- Includes a secret randomly chosen by the runner
- POST `/api/worker/status`
- Main endpoint for worker/server coordination
- Worker periodically sends current status to server
- Includes a secret randomly chosen by the worker
- Subsequent requests must include exactly the same secret
- Protects against the case where multiple runners share the same name
- Runner may include request for new work
- Protects against the case where multiple workers share the same name
- Worker may include request for new work
- If so, server may respond with a commit hash and bench method
- Runner may include current work
- Worker may include current work
- If so, server may respond with request to abort the work
- GET `/api/runner/repo/<hash>/tar`
- GET `/api/worker/repo/<hash>/tar`
- Get tar-ed commit from the server's repo, if any exists
- GET `/api/runner/bench-repo/<hash>/tar`
- GET `/api/worker/bench-repo/<hash>/tar`
- Get tar-ed commit from the server's bench repo, if any exist
## CLI Args
tablejohn can be run in one of two modes: Server mode, and runner mode.
tablejohn can be run in one of two modes: Server mode, and worker mode.
- server
- Run a web server that serves the contents of a db
- Optionally, specify repo to update the db from
- Optionally, launch local runner (only if repo is specified)
- When local runner is enabled, it ignores the runner section of the config
- Instead, a runner section is generated from the server config
- This approach should make `--local-runner` more fool-proof
- runner
- Run only as runner (when using external machine for runners)
- Optionally, launch local worker (only if repo is specified)
- When local worker is enabled, it ignores the worker section of the config
- Instead, a worker section is generated from the server config
- This approach should make `--local-worker` more fool-proof
- worker
- Run only as worker (when using external machine for workers)
- Same config file format as server, just uses different parts
## Config file and options
@ -110,17 +110,17 @@ tablejohn can be run in one of two modes: Server mode, and runner mode.
Regardless of the mode, the config file is always loaded the same way and has
the same format. It is split into these chunks:
- web (ignored in runner mode)
- web (ignored in worker mode)
- Everything to do with the web server
- What address and port to bind on
- What url the site is being served under
- repo (ignored in runner mode)
- repo (ignored in worker mode)
- Everything to do with the repo the server is inspecting
- Name (derived from repo path if not specified here)
- How frequently to update the db from the repo
- A remote URL to update the repo from
- Whether to clone the repo if it doesn't yet exist
- runner (ignored in server mode)
- worker (ignored in server mode)
- Name (uses system name by default)
- Custom bench dir path (creates temporary dir by default)
- List of servers, each of which has...

View file

@ -19,17 +19,17 @@ pub struct ServerCommand {
#[arg(long, short)]
pub open: bool,
/// Start one or more local runners for this server.
/// Start one or more local workers for this server.
#[arg(long, short, action = clap::ArgAction::Count)]
pub local_runner: u8,
pub local_worker: u8,
}
#[derive(Debug, clap::Parser)]
pub enum Command {
/// Start a tablejohn server.
Server(ServerCommand),
/// Start a tablejohn benchmark runner.
Runner,
/// Start a tablejohn worker.
Worker,
// TODO bench script command?
}

View file

@ -30,7 +30,7 @@ mod default {
"[::1]:8221".parse().unwrap()
}
pub fn web_runner_timeout() -> Duration {
pub fn web_worker_timeout() -> Duration {
Duration::from_secs(60)
}
@ -38,7 +38,7 @@ mod default {
Duration::from_secs(60)
}
pub fn runner_ping_delay() -> Duration {
pub fn worker_ping_delay() -> Duration {
Duration::from_secs(10)
}
}
@ -51,10 +51,10 @@ struct Web {
#[serde(default = "default::web_address")]
address: SocketAddr,
runner_token: Option<String>,
worker_token: Option<String>,
#[serde(default = "default::web_runner_timeout")]
runner_timeout: Duration,
#[serde(default = "default::web_worker_timeout")]
worker_timeout: Duration,
}
impl Default for Web {
@ -62,8 +62,8 @@ impl Default for Web {
Self {
base: default::web_base(),
address: default::web_address(),
runner_token: None,
runner_timeout: default::web_runner_timeout(),
worker_token: None,
worker_timeout: default::web_worker_timeout(),
}
}
}
@ -86,26 +86,26 @@ impl Default for Repo {
}
#[derive(Debug, Deserialize)]
struct RunnerServer {
struct WorkerServer {
url: String,
token: String,
}
#[derive(Debug, Deserialize)]
struct Runner {
struct Worker {
name: Option<String>,
#[serde(default = "default::runner_ping_delay", with = "humantime_serde")]
#[serde(default = "default::worker_ping_delay", with = "humantime_serde")]
ping_delay: Duration,
servers: HashMap<String, RunnerServer>,
servers: HashMap<String, WorkerServer>,
}
impl Default for Runner {
impl Default for Worker {
fn default() -> Self {
Self {
name: None,
ping_delay: default::runner_ping_delay(),
ping_delay: default::worker_ping_delay(),
servers: HashMap::new(),
}
}
@ -120,7 +120,7 @@ struct ConfigFile {
repo: Repo,
#[serde(default)]
runner: Runner,
worker: Worker,
}
impl ConfigFile {
@ -148,11 +148,11 @@ impl ConfigFile {
base
}
fn web_runner_token(&self) -> String {
fn web_worker_token(&self) -> String {
self.web
.runner_token
.worker_token
.clone()
.unwrap_or_else(id::random_runner_token)
.unwrap_or_else(id::random_worker_token)
}
fn repo_name(&self, args: &Args) -> somehow::Result<String> {
@ -174,16 +174,16 @@ impl ConfigFile {
Ok("unnamed repo".to_string())
}
fn runner_name(&self) -> String {
if let Some(name) = &self.runner.name {
fn worker_name(&self) -> String {
if let Some(name) = &self.worker.name {
return name.clone();
}
gethostname::gethostname().to_string_lossy().to_string()
}
fn runner_servers(&self) -> HashMap<String, RunnerServerConfig> {
self.runner
fn worker_servers(&self) -> HashMap<String, WorkerServerConfig> {
self.worker
.servers
.iter()
.map(|(name, server)| {
@ -192,14 +192,14 @@ impl ConfigFile {
url.push('/');
}
let token = server.token.to_string();
(name.to_string(), RunnerServerConfig { url, token })
(name.to_string(), WorkerServerConfig { url, token })
})
.collect()
}
}
#[derive(Clone)]
pub struct RunnerServerConfig {
pub struct WorkerServerConfig {
/// Always ends with a `/`.
pub url: String,
pub token: String,
@ -210,13 +210,13 @@ pub struct Config {
/// Always starts and ends with a `/`.
pub web_base: String,
pub web_address: SocketAddr,
pub web_runner_token: String,
pub web_runner_timeout: Duration,
pub web_worker_token: String,
pub web_worker_timeout: Duration,
pub repo_name: String,
pub repo_update_delay: Duration,
pub runner_name: String,
pub runner_ping_delay: Duration,
pub runner_servers: HashMap<String, RunnerServerConfig>,
pub worker_name: String,
pub worker_ping_delay: Duration,
pub worker_servers: HashMap<String, WorkerServerConfig>,
}
impl Config {
@ -238,21 +238,21 @@ impl Config {
debug!("Loaded config file:\n{config_file:#?}");
let web_base = config_file.web_base();
let web_runner_token = config_file.web_runner_token();
let web_worker_token = config_file.web_worker_token();
let repo_name = config_file.repo_name(args)?;
let runner_name = config_file.runner_name();
let runner_servers = config_file.runner_servers();
let worker_name = config_file.worker_name();
let worker_servers = config_file.worker_servers();
Ok(Self {
web_base,
web_address: config_file.web.address,
web_runner_token,
web_runner_timeout: config_file.web.runner_timeout,
web_worker_token,
web_worker_timeout: config_file.web.worker_timeout,
repo_name,
repo_update_delay: config_file.repo.update_delay,
runner_name,
runner_ping_delay: config_file.runner.ping_delay,
runner_servers,
worker_name,
worker_ping_delay: config_file.worker.ping_delay,
worker_servers,
})
}
}

View file

@ -9,10 +9,10 @@ fn random_id(prefix: &str, length: usize) -> String {
.collect()
}
pub fn random_runner_token() -> String {
pub fn random_worker_token() -> String {
random_id("t", 30)
}
pub fn random_runner_secret() -> String {
pub fn random_worker_secret() -> String {
random_id("s", 30)
}

View file

@ -1,15 +1,15 @@
mod args;
mod config;
mod id;
mod runner;
mod server;
mod shared;
mod somehow;
mod worker;
use std::{io, process, time::Duration};
use clap::Parser;
use config::RunnerServerConfig;
use config::WorkerServerConfig;
use tokio::{select, signal::unix::SignalKind};
use tracing::{debug, error, info, Level};
use tracing_subscriber::{
@ -19,8 +19,8 @@ use tracing_subscriber::{
use crate::{
args::{Args, Command, NAME, VERSION},
config::Config,
runner::Runner,
server::Server,
worker::Worker,
};
fn set_up_logging(verbose: u8) {
@ -93,24 +93,24 @@ async fn open_in_browser(config: &Config) {
}
}
async fn launch_local_runners(config: &'static Config, amount: u8) {
async fn launch_local_workers(config: &'static Config, amount: u8) {
let server_name = "localhost";
let server_config = Box::leak(Box::new(RunnerServerConfig {
let server_config = Box::leak(Box::new(WorkerServerConfig {
url: format!("http://{}{}", config.web_address, config.web_base),
token: config.web_runner_token.clone(),
token: config.web_worker_token.clone(),
}));
// Wait a bit to ensure the server is ready to serve requests.
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..amount {
let mut runner_config = config.clone();
runner_config.runner_name = format!("{}-{i}", config.runner_name);
let runner_config = Box::leak(Box::new(runner_config));
let mut worker_config = config.clone();
worker_config.worker_name = format!("{}-{i}", config.worker_name);
let worker_config = Box::leak(Box::new(worker_config));
info!("Launching local runner {}", runner_config.runner_name);
runner::launch_standalone_server_task(
runner_config,
info!("Launching local worker {}", worker_config.worker_name);
worker::launch_standalone_server_task(
worker_config,
server_name.to_string(),
server_config,
);
@ -131,8 +131,8 @@ async fn run() -> somehow::Result<()> {
tokio::task::spawn(open_in_browser(config));
}
if command.local_runner > 0 {
tokio::task::spawn(launch_local_runners(config, command.local_runner));
if command.local_worker > 0 {
tokio::task::spawn(launch_local_workers(config, command.local_worker));
}
let server = Server::new(config, command).await?;
@ -155,12 +155,12 @@ async fn run() -> somehow::Result<()> {
_ = server.shut_down() => {}
}
}
Command::Runner => {
let runner = Runner::new(config);
Command::Worker => {
let worker = Worker::new(config);
select! {
_ = wait_for_signal() => {}
_ = runner.run() => {}
_ = worker.run() => {}
}
}
}

View file

@ -1,7 +1,7 @@
mod recurring;
mod runners;
mod util;
mod web;
mod workers;
use std::{
path::Path,
@ -18,13 +18,9 @@ use sqlx::{
use tokio::select;
use tracing::{debug, info};
use crate::{
args::ServerCommand,
config::{Config, RunnerServerConfig},
runner, somehow,
};
use crate::{args::ServerCommand, config::Config, somehow};
use self::runners::Runners;
use self::workers::Workers;
async fn open_db(db_path: &Path) -> sqlx::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
@ -72,7 +68,7 @@ pub struct Server {
db: SqlitePool,
repo: Option<Repo>,
bench_repo: Option<BenchRepo>,
runners: Arc<Mutex<Runners>>,
workers: Arc<Mutex<Workers>>,
}
impl Server {
@ -98,7 +94,7 @@ impl Server {
db: open_db(&command.db).await?,
repo,
bench_repo,
runners: Arc::new(Mutex::new(Runners::new(config))),
workers: Arc::new(Mutex::new(Workers::new(config))),
})
}

View file

@ -3,8 +3,8 @@ mod commit;
mod index;
mod link;
mod queue;
mod runner;
mod r#static;
mod worker;
use axum::{routing::get, Router};
@ -46,7 +46,7 @@ pub async fn run(server: Server) -> somehow::Result<()> {
let app = Router::new()
.route("/", get(index::get))
.route("/commit/:hash", get(commit::get))
.route("/runner/:name", get(runner::get))
.route("/worker/:name", get(worker::get))
.route("/queue/", get(queue::get))
.route("/queue/inner", get(queue::get_inner))
.merge(api::router(&server))

View file

@ -17,10 +17,10 @@ use tracing::debug;
use crate::{
config::Config,
server::{
runners::{RunnerInfo, Runners},
workers::{WorkerInfo, Workers},
BenchRepo, Server,
},
shared::{BenchMethod, RunnerRequest, ServerResponse, Work},
shared::{BenchMethod, ServerResponse, Work, WorkerRequest},
somehow,
};
@ -29,8 +29,8 @@ async fn post_status(
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
State(bench_repo): State<Option<BenchRepo>>,
State(runners): State<Arc<Mutex<Runners>>>,
Json(request): Json<RunnerRequest>,
State(workers): State<Arc<Mutex<Workers>>>,
Json(request): Json<WorkerRequest>,
) -> somehow::Result<Response> {
let name = match auth::authenticate(config, auth) {
Ok(name) => name,
@ -46,14 +46,14 @@ async fn post_status(
.fetch_all(&db)
.await?;
let mut guard = runners.lock().unwrap();
let mut guard = workers.lock().unwrap();
guard.clean();
if !guard.verify(&name, &request.secret) {
return Ok((StatusCode::UNAUTHORIZED, "invalid secret").into_response());
}
guard.update(
name.clone(),
RunnerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
WorkerInfo::new(request.secret, OffsetDateTime::now_utc(), request.status),
);
let work = match request.request_work {
true => guard.find_free_work(&queue),
@ -91,5 +91,5 @@ pub fn router(server: &Server) -> Router<Server> {
// TODO Get repo tar
// TODO Get bench repo tar
Router::new().route("/api/runner/status", post(post_status))
Router::new().route("/api/worker/status", post(post_status))
}

View file

@ -19,7 +19,7 @@ fn is_username_valid(username: &str) -> bool {
}
fn is_password_valid(password: &str, config: &'static Config) -> bool {
password == config.web_runner_token
password == config.web_worker_token
}
pub fn authenticate(
@ -36,7 +36,7 @@ pub fn authenticate(
StatusCode::UNAUTHORIZED,
[(
header::WWW_AUTHENTICATE,
HeaderValue::from_str("Basic realm=\"runner api\"").unwrap(),
HeaderValue::from_str("Basic realm=\"worker api\"").unwrap(),
)],
"invalid credentials",
)

View file

@ -62,17 +62,17 @@ impl RunLink {
#[template(
ext = "html",
source = "\
<a href=\"{{ root }}runner/{{ name }}\">
<a href=\"{{ root }}worker/{{ name }}\">
{{ name }}
</a>
"
)]
pub struct RunnerLink {
pub struct WorkerLink {
root: String,
name: String,
}
impl RunnerLink {
impl WorkerLink {
pub fn new(base: &Base, name: String) -> Self {
Self {
root: base.root.clone(),

View file

@ -11,15 +11,15 @@ use sqlx::SqlitePool;
use crate::{
config::Config,
server::{
runners::{RunnerInfo, Runners},
util,
workers::{WorkerInfo, Workers},
},
shared::RunnerStatus,
shared::WorkerStatus,
somehow,
};
use super::{
link::{CommitLink, RunLink, RunnerLink},
link::{CommitLink, RunLink, WorkerLink},
Base, Tab,
};
@ -29,8 +29,8 @@ enum Status {
Working(RunLink),
}
struct Runner {
link: RunnerLink,
struct Worker {
link: WorkerLink,
status: Status,
}
@ -38,33 +38,33 @@ struct Task {
commit: CommitLink,
since: String,
priority: i64,
runners: Vec<RunnerLink>,
workers: Vec<WorkerLink>,
odd: bool,
}
fn sorted_runners(runners: &Mutex<Runners>) -> Vec<(String, RunnerInfo)> {
let mut runners = runners
fn sorted_workers(workers: &Mutex<Workers>) -> Vec<(String, WorkerInfo)> {
let mut workers = workers
.lock()
.unwrap()
.clean()
.get_all()
.into_iter()
.collect::<Vec<_>>();
runners.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
runners
workers.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
workers
}
async fn get_runners(
async fn get_workers(
db: &SqlitePool,
runners: &[(String, RunnerInfo)],
workers: &[(String, WorkerInfo)],
base: &Base,
) -> somehow::Result<Vec<Runner>> {
) -> somehow::Result<Vec<Worker>> {
let mut result = vec![];
for (name, info) in runners {
for (name, info) in workers {
let status = match &info.status {
RunnerStatus::Idle => Status::Idle,
RunnerStatus::Busy => Status::Busy,
RunnerStatus::Working(run) => {
WorkerStatus::Idle => Status::Idle,
WorkerStatus::Busy => Status::Busy,
WorkerStatus::Working(run) => {
let message =
sqlx::query_scalar!("SELECT message FROM commits WHERE hash = ?", run.hash)
.fetch_one(db)
@ -73,8 +73,8 @@ async fn get_runners(
}
};
result.push(Runner {
link: RunnerLink::new(base, name.clone()),
result.push(Worker {
link: WorkerLink::new(base, name.clone()),
status,
})
}
@ -83,17 +83,17 @@ async fn get_runners(
async fn get_queue(
db: &SqlitePool,
runners: &[(String, RunnerInfo)],
workers: &[(String, WorkerInfo)],
base: &Base,
) -> somehow::Result<Vec<Task>> {
// Group runners by commit hash
let mut runners_by_commit: HashMap<String, Vec<RunnerLink>> = HashMap::new();
for (name, info) in runners {
if let RunnerStatus::Working(run) = &info.status {
runners_by_commit
// Group workers by commit hash
let mut workers_by_commit: HashMap<String, Vec<WorkerLink>> = HashMap::new();
for (name, info) in workers {
if let WorkerStatus::Working(run) = &info.status {
workers_by_commit
.entry(run.hash.clone())
.or_default()
.push(RunnerLink::new(base, name.clone()));
.push(WorkerLink::new(base, name.clone()));
}
}
@ -112,7 +112,7 @@ async fn get_queue(
)
.fetch(db)
.map_ok(|r| Task {
runners: runners_by_commit.remove(&r.hash).unwrap_or_default(),
workers: workers_by_commit.remove(&r.hash).unwrap_or_default(),
commit: CommitLink::new(base, r.hash, &r.message, r.reachable),
since: util::format_delta_from_now(r.date),
priority: r.priority,
@ -137,20 +137,20 @@ async fn get_queue(
#[derive(Template)]
#[template(path = "queue_inner.html")]
struct QueueInnerTemplate {
runners: Vec<Runner>,
workers: Vec<Worker>,
tasks: Vec<Task>,
}
pub async fn get_inner(
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
State(runners): State<Arc<Mutex<Runners>>>,
State(workers): State<Arc<Mutex<Workers>>>,
) -> somehow::Result<impl IntoResponse> {
let base = Base::new(config, Tab::Queue);
let sorted_runners = sorted_runners(&runners);
let runners = get_runners(&db, &sorted_runners, &base).await?;
let tasks = get_queue(&db, &sorted_runners, &base).await?;
Ok(QueueInnerTemplate { runners, tasks })
let sorted_workers = sorted_workers(&workers);
let workers = get_workers(&db, &sorted_workers, &base).await?;
let tasks = get_queue(&db, &sorted_workers, &base).await?;
Ok(QueueInnerTemplate { workers, tasks })
}
#[derive(Template)]
#[template(path = "queue.html")]
@ -162,14 +162,14 @@ struct QueueTemplate {
pub async fn get(
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
State(runners): State<Arc<Mutex<Runners>>>,
State(workers): State<Arc<Mutex<Workers>>>,
) -> somehow::Result<impl IntoResponse> {
let base = Base::new(config, Tab::Queue);
let sorted_runners = sorted_runners(&runners);
let runners = get_runners(&db, &sorted_runners, &base).await?;
let tasks = get_queue(&db, &sorted_runners, &base).await?;
let sorted_workers = sorted_workers(&workers);
let workers = get_workers(&db, &sorted_workers, &base).await?;
let tasks = get_queue(&db, &sorted_workers, &base).await?;
Ok(QueueTemplate {
base,
inner: QueueInnerTemplate { runners, tasks },
inner: QueueInnerTemplate { workers, tasks },
})
}

View file

@ -9,15 +9,15 @@ use axum::{
use crate::{
config::Config,
server::{runners::Runners, util},
server::{util, workers::Workers},
somehow,
};
use super::{Base, Tab};
#[derive(Template)]
#[template(path = "runner.html")]
struct RunnerTemplate {
#[template(path = "worker.html")]
struct WorkerTemplate {
base: Base,
name: String,
last_seen: String,
@ -27,14 +27,14 @@ struct RunnerTemplate {
pub async fn get(
Path(name): Path<String>,
State(config): State<&'static Config>,
State(runners): State<Arc<Mutex<Runners>>>,
State(workers): State<Arc<Mutex<Workers>>>,
) -> somehow::Result<Response> {
let info = runners.lock().unwrap().clean().get(&name);
let info = workers.lock().unwrap().clean().get(&name);
let Some(info) = info else {
return Ok(StatusCode::NOT_FOUND.into_response());
};
Ok(RunnerTemplate {
Ok(WorkerTemplate {
base: Base::new(config, Tab::None),
name,
last_seen: util::format_time(info.last_seen),

View file

@ -3,17 +3,17 @@ use std::collections::HashMap;
use gix::hashtable::HashSet;
use time::OffsetDateTime;
use crate::{config::Config, shared::RunnerStatus};
use crate::{config::Config, shared::WorkerStatus};
#[derive(Clone)]
pub struct RunnerInfo {
pub struct WorkerInfo {
pub secret: String,
pub last_seen: OffsetDateTime,
pub status: RunnerStatus,
pub status: WorkerStatus,
}
impl RunnerInfo {
pub fn new(secret: String, last_seen: OffsetDateTime, status: RunnerStatus) -> Self {
impl WorkerInfo {
pub fn new(secret: String, last_seen: OffsetDateTime, status: WorkerStatus) -> Self {
Self {
secret,
last_seen,
@ -22,40 +22,40 @@ impl RunnerInfo {
}
}
pub struct Runners {
pub struct Workers {
config: &'static Config,
runners: HashMap<String, RunnerInfo>,
workers: HashMap<String, WorkerInfo>,
}
impl Runners {
impl Workers {
pub fn new(config: &'static Config) -> Self {
Self {
config,
runners: HashMap::new(),
workers: HashMap::new(),
}
}
pub fn clean(&mut self) -> &mut Self {
let now = OffsetDateTime::now_utc();
self.runners
.retain(|_, v| now <= v.last_seen + self.config.web_runner_timeout);
self.workers
.retain(|_, v| now <= v.last_seen + self.config.web_worker_timeout);
self
}
pub fn verify(&self, name: &str, secret: &str) -> bool {
let Some(runner) = self.runners.get(name) else { return true; };
runner.secret == secret
let Some(worker) = self.workers.get(name) else { return true; };
worker.secret == secret
}
pub fn update(&mut self, name: String, info: RunnerInfo) {
self.runners.insert(name, info);
pub fn update(&mut self, name: String, info: WorkerInfo) {
self.workers.insert(name, info);
}
fn oldest_working_on(&self, hash: &str) -> Option<&str> {
self.runners
self.workers
.iter()
.filter_map(|(name, info)| match &info.status {
RunnerStatus::Working(run) if run.hash == hash => Some((name, run.start)),
WorkerStatus::Working(run) if run.hash == hash => Some((name, run.start)),
_ => None,
})
.max_by_key(|(_, since)| *since)
@ -63,18 +63,18 @@ impl Runners {
}
pub fn should_abort_work(&self, name: &str) -> bool {
let Some(info) = self.runners.get(name) else { return false; };
let RunnerStatus::Working ( run) = &info.status else { return false; };
let Some(info) = self.workers.get(name) else { return false; };
let WorkerStatus::Working ( run) = &info.status else { return false; };
let Some(oldest) = self.oldest_working_on(&run.hash) else { return false; };
name != oldest
}
pub fn find_free_work<'a>(&self, hashes: &'a [String]) -> Option<&'a str> {
let covered = self
.runners
.workers
.values()
.filter_map(|info| match &info.status {
RunnerStatus::Working(run) => Some(&run.hash),
WorkerStatus::Working(run) => Some(&run.hash),
_ => None,
})
.collect::<HashSet<_>>();
@ -85,11 +85,11 @@ impl Runners {
.map(|hash| hash as &str)
}
pub fn get(&self, name: &str) -> Option<RunnerInfo> {
self.runners.get(name).cloned()
pub fn get(&self, name: &str) -> Option<WorkerInfo> {
self.workers.get(name).cloned()
}
pub fn get_all(&self) -> HashMap<String, RunnerInfo> {
self.runners.clone()
pub fn get_all(&self) -> HashMap<String, WorkerInfo> {
self.workers.clone()
}
}

View file

@ -1,4 +1,4 @@
//! Data structures modelling the communication between server and runner.
//! Data structures modelling the communication between server and worker.
use std::collections::HashMap;
@ -62,36 +62,36 @@ pub struct FinishedRun {
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum RunnerStatus {
/// The runner is not performing any work.
pub enum WorkerStatus {
/// The worker is not performing any work.
Idle,
/// The runner is performing work for another server.
/// The worker is performing work for another server.
Busy,
/// The runner is performing work for the current server.
/// The worker is performing work for the current server.
Working(UnfinishedRun),
}
#[derive(Clone, Serialize, Deserialize)]
pub struct RunnerRequest {
/// Additional free-form info about the runner.
pub struct WorkerRequest {
/// Additional free-form info about the worker.
///
/// This could for example be used to describe the runner's system specs.
/// This could for example be used to describe the worker's system specs.
pub info: Option<String>,
/// Secret for preventing name collisions.
pub secret: String,
/// What the runner is currently working on.
pub status: RunnerStatus,
/// What the worker is currently working on.
pub status: WorkerStatus,
/// Whether the runner wants new work from the server.
/// Whether the worker wants new work from the server.
///
/// If the server has a commit available, it should respond with a non-null
/// [`Response::work`].
#[serde(default, skip_serializing_if = "is_false")]
pub request_work: bool,
/// The runner has finished a run and wants to submit the results.
/// The worker has finished a run and wants to submit the results.
#[serde(skip_serializing_if = "Option::is_none")]
pub submit_work: Option<FinishedRun>,
}
@ -116,20 +116,20 @@ pub struct Work {
#[derive(Serialize, Deserialize)]
pub struct ServerResponse {
/// Work the runner requested using [`Request::request_work].
/// Work the worker requested using [`Request::request_work].
///
/// The runner may ignore this work and do something else. However, until
/// the next update request sent by the runner, the server will consider the
/// runner as preparing to work on the commit, and will not give out the
/// same commit to other runners.
/// The worker may ignore this work and do something else. However, until
/// the next update request sent by the worker, the server will consider the
/// worker as preparing to work on the commit, and will not give out the
/// same commit to other workers.
#[serde(skip_serializing_if = "Option::is_none")]
pub work: Option<Work>,
/// The runner should abort the current run.
/// The worker should abort the current run.
///
/// The server may send this because it detected the runner is benchmarking
/// the same commit as another runner and has broken the tie in favor of the
/// other runner. The runner may continue the run despite this flag.
/// The server may send this because it detected the worker is benchmarking
/// the same commit as another worker and has broken the tie in favor of the
/// other worker. The worker may continue the run despite this flag.
#[serde(default, skip_serializing_if = "is_false")]
pub abort_work: bool,
}

View file

@ -6,21 +6,21 @@ use std::sync::{Arc, Mutex};
use tokio::task::JoinSet;
use tracing::{debug, error};
use crate::config::{Config, RunnerServerConfig};
use crate::config::{Config, WorkerServerConfig};
use self::{coordinator::Coordinator, server::Server};
pub struct Runner {
pub struct Worker {
config: &'static Config,
}
impl Runner {
impl Worker {
pub fn new(config: &'static Config) -> Self {
Self { config }
}
pub async fn run(&self) {
if self.config.runner_servers.is_empty() {
if self.config.worker_servers.is_empty() {
error!("No servers specified in config");
return;
}
@ -28,7 +28,7 @@ impl Runner {
let coordinator = Arc::new(Mutex::new(Coordinator::new()));
let mut tasks = JoinSet::new();
for (name, server_config) in self.config.runner_servers.iter() {
for (name, server_config) in self.config.worker_servers.iter() {
debug!("Launching task for server {name}");
let mut server = Server::new(
name.clone(),
@ -46,7 +46,7 @@ impl Runner {
pub fn launch_standalone_server_task(
config: &'static Config,
server_name: String,
server_config: &'static RunnerServerConfig,
server_config: &'static WorkerServerConfig,
) {
let coordinator = Arc::new(Mutex::new(Coordinator::new()));
let mut server = Server::new(server_name, config, server_config, coordinator);

View file

@ -45,12 +45,12 @@ impl Coordinator {
self.current += 1;
self.current %= self.servers.len();
// When the runner seeks work and a queue is idle, the next server
// When the worker seeks work and a queue is idle, the next server
// should be queried immediately. Otherwise, we'd introduce lots of
// delay in the multi-server case were most queues are empty.
//
// However, if all server's queues were empty, this would generate a
// slippery cycle of requests that the runner sends as quickly as
// slippery cycle of requests that the worker sends as quickly as
// possible, only limited by the roundtrip time. Because we don't want
// this, we let the first task wait its full timeout. Effectively, this
// results in iterations starting at least the ping delay apart, which

View file

@ -5,9 +5,9 @@ use tokio::sync::mpsc;
use tracing::{debug, info_span, warn, Instrument};
use crate::{
config::{Config, RunnerServerConfig},
config::{Config, WorkerServerConfig},
id,
shared::{RunnerRequest, RunnerStatus},
shared::{WorkerRequest, WorkerStatus},
somehow,
};
@ -16,7 +16,7 @@ use super::coordinator::Coordinator;
pub struct Server {
name: String,
config: &'static Config,
server_config: &'static RunnerServerConfig,
server_config: &'static WorkerServerConfig,
coordinator: Arc<Mutex<Coordinator>>,
client: Client,
secret: String,
@ -26,7 +26,7 @@ impl Server {
pub fn new(
name: String,
config: &'static Config,
server_config: &'static RunnerServerConfig,
server_config: &'static WorkerServerConfig,
coordinator: Arc<Mutex<Coordinator>>,
) -> Self {
Self {
@ -35,7 +35,7 @@ impl Server {
server_config,
coordinator,
client: Client::new(),
secret: id::random_runner_secret(),
secret: id::random_worker_secret(),
}
}
@ -57,7 +57,7 @@ impl Server {
// Wait for poke or until the ping delay elapses. If we get
// poked while pinging the server, this will not wait and we'll
// immediately do another ping.
let _ = tokio::time::timeout(self.config.runner_ping_delay, poke_rx.recv()).await;
let _ = tokio::time::timeout(self.config.worker_ping_delay, poke_rx.recv()).await;
// Empty queue in case we were poked more than once. This can
// happen for example if we get poked multiple times while
@ -65,23 +65,23 @@ impl Server {
while poke_rx.try_recv().is_ok() {}
}
}
.instrument(info_span!("runner", name))
.instrument(info_span!("worker", name))
.await;
}
async fn ping(&self) -> somehow::Result<()> {
debug!("Pinging");
let request = RunnerRequest {
let request = WorkerRequest {
info: None,
secret: self.secret.clone(),
status: RunnerStatus::Idle,
status: WorkerStatus::Idle,
request_work: false,
submit_work: None,
};
let url = format!("{}api/runner/status", self.server_config.url);
let url = format!("{}api/worker/status", self.server_config.url);
self.client
.post(url)
.basic_auth(&self.config.runner_name, Some(&self.server_config.token))
.basic_auth(&self.config.worker_name, Some(&self.server_config.token))
.json(&request)
.send()
.await?;

View file

@ -127,19 +127,19 @@ nav a:hover {
white-space: pre-wrap;
}
/* Runner */
/* Worker */
.runner * {
.worker * {
margin: 0;
}
.runner dl {
.worker dl {
display: grid;
grid: auto-flow / max-content 1fr;
column-gap: 1ch;
}
.runner .name {
.worker .name {
color: #380;
font-weight: bold;
}

View file

@ -1,21 +1,21 @@
{% import "util.html" as util %}
<h2>Runners</h2>
{% if runners.is_empty() %}
<p>No runners connected</p>
<h2>Workers</h2>
{% if workers.is_empty() %}
<p>No workers connected</p>
{% else %}
<table>
<thead>
<tr>
<th>runner</th>
<th>worker</th>
<th>status</th>
</tr>
</thead>
<tbody>
{% for runner in runners %}
{% for worker in workers %}
<tr>
<td>{{ runner.link|safe }}</td>
{% match runner.status %}
<td>{{ worker.link|safe }}</td>
{% match worker.status %}
{% when Status::Idle %}
<td>idle</td>
{% when Status::Busy %}
@ -36,7 +36,7 @@
<th>commit</th>
<th>since</th>
<th>prio</th>
<th>runner</th>
<th>worker</th>
</tr>
</thead>
<tbody>
@ -45,10 +45,10 @@
<td>{{ task.commit|safe }}</td>
<td>{{ task.since }}</td>
<td>{{ task.priority }}</td>
{% if task.runners.is_empty() %}
{% if task.workers.is_empty() %}
<td>-</td>
{% else %}
<td>{{ task.runners|join(", ")|safe }}</td>
<td>{{ task.workers|join(", ")|safe }}</td>
{% endif %}
</tr>
{% endfor %}

View file

@ -3,9 +3,9 @@
{% block title %}{{ name }}{% endblock %}
{% block body %}
<h2>Runner</h2>
<div class="runner">
<span class="name">runner {{ name }}</span>
<h2>Worker</h2>
<div class="worker">
<span class="name">worker {{ name }}</span>
<dl>
<dt>Last seen:</dt>
<dd>{{ last_seen }}</dd>