From a54e8424781faa0e03bf89ea21ba3dba2d9a41c1 Mon Sep 17 00:00:00 2001 From: Joscha Date: Thu, 17 Aug 2023 00:28:58 +0200 Subject: [PATCH] Restructure config file --- src/config.rs | 312 ++++++++++++++++-------------- src/main.rs | 51 +++-- src/server.rs | 9 +- src/server/recurring.rs | 2 +- src/server/web.rs | 2 +- src/server/web/admin/queue.rs | 10 +- src/server/web/api/worker.rs | 8 +- src/server/web/api/worker/auth.rs | 8 +- src/server/web/base.rs | 6 +- src/server/web/pages/commit.rs | 4 +- src/server/web/pages/graph.rs | 4 +- src/server/web/pages/index.rs | 4 +- src/server/web/pages/queue.rs | 8 +- src/server/web/pages/run.rs | 6 +- src/server/web/pages/worker.rs | 4 +- src/server/workers.rs | 8 +- src/worker.rs | 16 +- src/worker/server.rs | 12 +- 18 files changed, 258 insertions(+), 216 deletions(-) diff --git a/src/config.rs b/src/config.rs index e7922e9..b29bd05 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,87 +1,99 @@ //! Configuration from a file. -use std::{ - collections::HashMap, - fs, - io::ErrorKind, - net::SocketAddr, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{collections::HashMap, fs, net::SocketAddr, path::PathBuf, time::Duration}; use directories::ProjectDirs; use serde::Deserialize; use tracing::{debug, info}; use crate::{ - args::{Args, Command, ServerCommand}, + args::{Args, Command}, id, somehow, }; #[derive(Debug, Deserialize)] #[serde(default)] -struct Web { - base: String, - address: SocketAddr, - worker_token: Option, - worker_timeout: Duration, - worker_max_upload: usize, +struct RawServerRepo { + name: Option, + update: Duration, + fetch_refs: Vec, + fetch_url: Option, } -impl Default for Web { +impl Default for RawServerRepo { fn default() -> Self { Self { - base: "/".to_string(), - address: "[::1]:8221".parse().unwrap(), // Port chosen by fair dice roll - worker_token: None, - worker_timeout: Duration::from_secs(60), - worker_max_upload: 1024 * 1024 * 8, // 8 MiB + name: None, + update: Duration::from_secs(60), + fetch_refs: vec!["+refs/*:refs/*".to_string()], + fetch_url: None, } } } #[derive(Debug, Deserialize)] #[serde(default)] -struct Repo { - name: Option, - update_delay: Duration, +struct RawServerWeb { + address: SocketAddr, + base: String, } -impl Default for Repo { +impl Default for RawServerWeb { fn default() -> Self { Self { - name: None, - update_delay: Duration::from_secs(60), + address: "[::1]:8221".parse().unwrap(), // Port chosen by fair dice roll + base: "/".to_string(), } } } #[derive(Debug, Deserialize)] -struct WorkerServer { +#[serde(default)] +struct RawServerWorker { + token: Option, + timeout: Duration, + upload: usize, +} + +impl Default for RawServerWorker { + fn default() -> Self { + Self { + token: None, + timeout: Duration::from_secs(60), + upload: 1024 * 1024 * 8, + } + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +struct RawServer { + repo: RawServerRepo, + web: RawServerWeb, + worker: RawServerWorker, +} + +#[derive(Debug, Deserialize)] +struct RawWorkerServer { url: String, token: String, } #[derive(Debug, Deserialize)] #[serde(default)] -struct Worker { +struct RawWorker { name: Option, - - #[serde(with = "humantime_serde")] - ping_delay: Duration, - - #[serde(with = "humantime_serde")] - batch_duration: Duration, - - servers: HashMap, + ping: Duration, + batch: Duration, + servers: HashMap, } -impl Default for Worker { +impl Default for RawWorker { fn default() -> Self { Self { name: None, - ping_delay: Duration::from_secs(10), - batch_duration: Duration::from_secs(60 * 10), + ping: Duration::from_secs(10), + batch: Duration::from_secs(60), servers: HashMap::new(), } } @@ -89,86 +101,79 @@ impl Default for Worker { #[derive(Debug, Default, Deserialize)] #[serde(default)] -struct ConfigFile { - web: Web, - repo: Repo, - worker: Worker, +struct RawConfig { + server: RawServer, + worker: RawWorker, } -impl ConfigFile { - fn load(path: &Path) -> somehow::Result { - let config = match fs::read_to_string(path) { - Ok(str) => toml::from_str(&str)?, - Err(e) if e.kind() == ErrorKind::NotFound => { - info!("No config file found, using default config"); - Self::default() +#[derive(Debug)] +pub struct ServerConfig { + pub repo_name: String, + pub repo_update: Duration, + pub repo_fetch_refs: Vec, + pub repo_fetch_url: Option, + pub web_address: SocketAddr, + /// Always ends without a `/`. + /// + /// This means that you can prefix the base onto an absolute path and get + /// another absolute path. You could also use an url here if you have a + /// weird reason to do so. + pub web_base: String, + pub worker_token: String, + pub worker_timeout: Duration, + pub worker_upload: usize, +} + +impl ServerConfig { + fn repo_name(args: &Args) -> String { + if let Command::Server(cmd) = &args.command { + if let Some(path) = &cmd.repo { + if let Ok(path) = path.canonicalize() { + if let Some(name) = path.file_name() { + let name = name.to_string_lossy(); + let name = name.strip_suffix(".git").unwrap_or(&name).to_string(); + return name; + } + } } - Err(e) => Err(e)?, + } + + "unnamed repo".to_string() + } + + fn from_raw_server(raw: RawServer, args: &Args) -> Self { + let repo_name = match raw.repo.name { + Some(name) => name, + None => Self::repo_name(args), }; - Ok(config) - } - - fn web_base(&self) -> String { - self.web + let web_base = raw + .web .base .strip_suffix('/') - .unwrap_or(&self.web.base) - .to_string() - } + .unwrap_or(&raw.web.base) + .to_string(); - fn web_worker_token(&self) -> String { - self.web - .worker_token - .clone() - .unwrap_or_else(id::random_worker_token) - } + let worker_token = match raw.worker.token { + Some(token) => token, + None => id::random_worker_token(), + }; - fn repo_name(&self, args: &Args) -> somehow::Result { - if let Some(name) = &self.repo.name { - return Ok(name.clone()); + Self { + repo_name, + repo_update: raw.repo.update, + repo_fetch_refs: raw.repo.fetch_refs, + repo_fetch_url: raw.repo.fetch_url, + web_address: raw.web.address, + web_base, + worker_token, + worker_timeout: raw.worker.timeout, + worker_upload: raw.worker.upload, } - - if let Command::Server(ServerCommand { - repo: Some(path), .. - }) = &args.command - { - if let Some(name) = path.canonicalize()?.file_name() { - let name = name.to_string_lossy(); - let name = name.strip_suffix(".git").unwrap_or(&name).to_string(); - return Ok(name); - } - } - - Ok("unnamed repo".to_string()) - } - - fn worker_name(&self) -> String { - if let Some(name) = &self.worker.name { - return name.clone(); - } - - gethostname::gethostname().to_string_lossy().to_string() - } - - fn worker_servers(&self) -> HashMap { - self.worker - .servers - .iter() - .map(|(name, server)| { - let url = server - .url - .strip_suffix('/') - .unwrap_or(&server.url) - .to_string(); - let token = server.token.to_string(); - (name.to_string(), WorkerServerConfig { url, token }) - }) - .collect() } } -#[derive(Clone)] +#[derive(Debug)] pub struct WorkerServerConfig { /// Always ends without a `/`. /// @@ -178,27 +183,59 @@ pub struct WorkerServerConfig { pub token: String, } -#[derive(Clone)] +impl WorkerServerConfig { + fn from_raw_worker_server(raw: RawWorkerServer) -> Self { + Self { + url: raw.url.strip_suffix('/').unwrap_or(&raw.url).to_string(), + token: raw.token, + } + } +} + +#[derive(Debug)] +pub struct WorkerConfig { + pub name: String, + pub ping: Duration, + pub batch: Duration, + pub servers: HashMap, +} + +impl WorkerConfig { + fn from_raw_worker(raw: RawWorker) -> Self { + let name = match raw.name { + Some(name) => name, + None => gethostname::gethostname().to_string_lossy().to_string(), + }; + + let servers = raw + .servers + .into_iter() + .map(|(k, v)| (k, WorkerServerConfig::from_raw_worker_server(v))) + .collect(); + + Self { + name, + ping: raw.ping, + batch: raw.batch, + servers, + } + } +} + +#[derive(Debug)] pub struct Config { - /// Always ends without a `/` (prioritizing the latter). - /// - /// This means that you can prefix the base onto an absolute path and get - /// another absolute path. You could also use an url here if you have a - /// weird reason to do so. - pub web_base: String, - pub web_address: SocketAddr, - pub web_worker_token: String, - pub web_worker_timeout: Duration, - pub web_worker_max_upload: usize, - pub repo_name: String, - pub repo_update_delay: Duration, - pub worker_name: String, - pub worker_ping_delay: Duration, - pub worker_batch_duration: Duration, - pub worker_servers: HashMap, + pub server: ServerConfig, + pub worker: WorkerConfig, } impl Config { + fn from_raw_config(raw: RawConfig, args: &Args) -> Self { + Self { + server: ServerConfig::from_raw_server(raw.server, args), + worker: WorkerConfig::from_raw_worker(raw.worker), + } + } + fn path(args: &Args) -> PathBuf { if let Some(path) = &args.config { return path.clone(); @@ -213,27 +250,12 @@ impl Config { pub fn load(args: &Args) -> somehow::Result { let path = Self::path(args); info!(path = %path.display(), "Loading config"); - let config_file = ConfigFile::load(&path)?; - debug!("Loaded config file:\n{config_file:#?}"); - let web_base = config_file.web_base(); - let web_worker_token = config_file.web_worker_token(); - let repo_name = config_file.repo_name(args)?; - let worker_name = config_file.worker_name(); - let worker_servers = config_file.worker_servers(); - - Ok(Self { - web_base, - web_address: config_file.web.address, - web_worker_token, - web_worker_timeout: config_file.web.worker_timeout, - web_worker_max_upload: config_file.web.worker_max_upload, - repo_name, - repo_update_delay: config_file.repo.update_delay, - worker_name, - worker_ping_delay: config_file.worker.ping_delay, - worker_batch_duration: config_file.worker.batch_duration, - worker_servers, - }) + let raw = fs::read_to_string(path)?; + let raw = toml::from_str::(&raw)?; + debug!("Loaded raw config: {raw:#?}"); + let config = Self::from_raw_config(raw, args); + debug!("Loaded config: {config:#?}"); + Ok(config) } } diff --git a/src/main.rs b/src/main.rs index edfae80..045fef4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,10 +10,10 @@ mod shared; mod somehow; mod worker; -use std::{io, process, time::Duration}; +use std::{collections::HashMap, io, net::IpAddr, process, time::Duration}; use clap::Parser; -use config::WorkerServerConfig; +use config::ServerConfig; use tokio::{select, signal::unix::SignalKind}; use tracing::{debug, error, info, Level}; use tracing_subscriber::{ @@ -22,7 +22,7 @@ use tracing_subscriber::{ use crate::{ args::{Args, Command, NAME, VERSION}, - config::Config, + config::{Config, WorkerConfig, WorkerServerConfig}, server::Server, worker::Worker, }; @@ -87,11 +87,25 @@ async fn die_on_signal() -> io::Result<()> { process::exit(1); } -async fn open_in_browser(config: &Config) { +fn local_url(config: &ServerConfig) -> String { + let host = match config.web_address.ip() { + IpAddr::V4(_) => "127.0.0.1", + IpAddr::V6(_) => "[::1]", + }; + let port = config.web_address.port(); + let base = &config.web_base; + if base.starts_with('/') { + format!("http://{host}:{port}{base}") + } else { + format!("http://{host}:{port}/{base}") + } +} + +async fn open_in_browser(config: &ServerConfig) { // Wait a bit to ensure the server is ready to serve requests. tokio::time::sleep(Duration::from_millis(100)).await; - let url = format!("http://{}{}", config.web_address, config.web_base); + let url = local_url(config); if let Err(e) = open::that_detached(&url) { error!("Error opening {url} in browser: {e:?}"); } @@ -102,20 +116,23 @@ async fn launch_local_workers(config: &'static Config, amount: u8) { tokio::time::sleep(Duration::from_millis(100)).await; for i in 0..amount { - let mut config = config.clone(); - config.worker_name = format!("{}-{i}", config.worker_name); - config.worker_servers.clear(); - config.worker_servers.insert( + let mut worker_config = WorkerConfig { + name: format!("{}-{i}", config.worker.name), + ping: config.worker.ping, + batch: config.worker.batch, + servers: HashMap::new(), + }; + worker_config.servers.insert( "localhost".to_string(), WorkerServerConfig { - url: format!("http://{}{}", config.web_address, config.web_base), - token: config.web_worker_token.clone(), + url: local_url(&config.server), + token: config.server.worker_token.clone(), }, ); - let config = Box::leak(Box::new(config)); + let worker_config = Box::leak(Box::new(worker_config)); - info!("Launching local worker {}", config.worker_name); - let worker = Worker::new(config); + info!("Launching local worker {}", worker_config.name); + let worker = Worker::new(worker_config); tokio::spawn(async move { worker.run().await }); } } @@ -131,14 +148,14 @@ async fn run() -> somehow::Result<()> { match args.command { Command::Server(command) => { if command.open { - tokio::task::spawn(open_in_browser(config)); + tokio::task::spawn(open_in_browser(&config.server)); } if command.local_worker > 0 { tokio::task::spawn(launch_local_workers(config, command.local_worker)); } - let server = Server::new(config, command).await?; + let server = Server::new(&config.server, command).await?; select! { _ = wait_for_signal() => {} _ = server.run() => {} @@ -159,7 +176,7 @@ async fn run() -> somehow::Result<()> { } } Command::Worker => { - let worker = Worker::new(config); + let worker = Worker::new(&config.worker); select! { _ = wait_for_signal() => {} diff --git a/src/server.rs b/src/server.rs index 69050a3..b514324 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,7 +18,7 @@ use sqlx::{ use tokio::select; use tracing::{debug, info}; -use crate::{args::ServerCommand, config::Config, somehow}; +use crate::{args::ServerCommand, config::ServerConfig, somehow}; use self::workers::Workers; @@ -64,7 +64,7 @@ pub struct BenchRepo(Arc); #[derive(Clone, FromRef)] pub struct Server { - config: &'static Config, + config: &'static ServerConfig, db: SqlitePool, repo: Option, bench_repo: Option, @@ -72,7 +72,10 @@ pub struct Server { } impl Server { - pub async fn new(config: &'static Config, command: ServerCommand) -> somehow::Result { + pub async fn new( + config: &'static ServerConfig, + command: ServerCommand, + ) -> somehow::Result { let repo = if let Some(path) = command.repo.as_ref() { info!(path = %path.display(), "Opening repo"); let repo = ThreadSafeRepository::open(path)?; diff --git a/src/server/recurring.rs b/src/server/recurring.rs index 84301ef..8c62ab0 100644 --- a/src/server/recurring.rs +++ b/src/server/recurring.rs @@ -31,6 +31,6 @@ async fn recurring_task(state: &Server, repo: Repo) { pub(super) async fn run(server: Server, repo: Repo) { loop { recurring_task(&server, repo.clone()).await; - tokio::time::sleep(server.config.repo_update_delay).await; + tokio::time::sleep(server.config.repo_update).await; } } diff --git a/src/server/web.rs b/src/server/web.rs index 1040e93..bc3ddbb 100644 --- a/src/server/web.rs +++ b/src/server/web.rs @@ -37,7 +37,7 @@ pub async fn run(server: Server) -> somehow::Result<()> { let post_api_worker_status = Router::new() .typed_post(post_api_worker_status) - .layer(DefaultBodyLimit::max(server.config.web_worker_max_upload)); + .layer(DefaultBodyLimit::max(server.config.worker_upload)); let app = Router::new() .typed_get(get_api_worker_bench_repo_by_hash_tree_tar_gz) diff --git a/src/server/web/admin/queue.rs b/src/server/web/admin/queue.rs index 6361656..771ce49 100644 --- a/src/server/web/admin/queue.rs +++ b/src/server/web/admin/queue.rs @@ -8,7 +8,7 @@ use sqlx::SqlitePool; use time::OffsetDateTime; use crate::{ - config::Config, + config::ServerConfig, server::web::{ base::Base, paths::{ @@ -28,7 +28,7 @@ pub struct FormAdminQueueAdd { pub async fn post_admin_queue_add( _path: PathAdminQueueAdd, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, Form(form): Form, ) -> somehow::Result { @@ -57,7 +57,7 @@ pub struct FormAdminQueueDelete { pub async fn post_admin_queue_delete( _path: PathAdminQueueDelete, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, Form(form): Form, ) -> somehow::Result { @@ -76,7 +76,7 @@ pub struct FormAdminQueueIncrease { pub async fn post_admin_queue_increase( _path: PathAdminQueueIncrease, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, Form(form): Form, ) -> somehow::Result { @@ -98,7 +98,7 @@ pub struct FormAdminQueueDecrease { pub async fn post_admin_queue_decrease( _path: PathAdminQueueDecrease, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, Form(form): Form, ) -> somehow::Result { diff --git a/src/server/web/api/worker.rs b/src/server/web/api/worker.rs index e99fc47..80f373f 100644 --- a/src/server/web/api/worker.rs +++ b/src/server/web/api/worker.rs @@ -18,7 +18,7 @@ use time::OffsetDateTime; use tracing::debug; use crate::{ - config::Config, + config::ServerConfig, server::{ web::paths::{ PathApiWorkerBenchRepoByHashTreeTarGz, PathApiWorkerRepoByHashTreeTarGz, @@ -125,7 +125,7 @@ async fn save_work( pub async fn post_api_worker_status( _path: PathApiWorkerStatus, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, State(bench_repo): State>, State(workers): State>>, @@ -204,7 +204,7 @@ fn stream_response(repo: Arc, id: ObjectId) -> impl IntoRe pub async fn get_api_worker_repo_by_hash_tree_tar_gz( path: PathApiWorkerRepoByHashTreeTarGz, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(repo): State>, auth: Option>>, ) -> somehow::Result { @@ -223,7 +223,7 @@ pub async fn get_api_worker_repo_by_hash_tree_tar_gz( pub async fn get_api_worker_bench_repo_by_hash_tree_tar_gz( path: PathApiWorkerBenchRepoByHashTreeTarGz, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(bench_repo): State>, auth: Option>>, ) -> somehow::Result { diff --git a/src/server/web/api/worker/auth.rs b/src/server/web/api/worker/auth.rs index 425387a..b4325fe 100644 --- a/src/server/web/api/worker/auth.rs +++ b/src/server/web/api/worker/auth.rs @@ -7,7 +7,7 @@ use axum::{ TypedHeader, }; -use crate::config::Config; +use crate::config::ServerConfig; fn is_username_valid(username: &str) -> bool { if username.is_empty() { @@ -19,12 +19,12 @@ fn is_username_valid(username: &str) -> bool { .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.') } -fn is_password_valid(password: &str, config: &'static Config) -> bool { - password == config.web_worker_token +fn is_password_valid(password: &str, config: &'static ServerConfig) -> bool { + password == config.worker_token } pub fn authenticate( - config: &'static Config, + config: &'static ServerConfig, auth: Option>>, ) -> Result { if let Some(auth) = auth { diff --git a/src/server/web/base.rs b/src/server/web/base.rs index 5b29a20..85fe23c 100644 --- a/src/server/web/base.rs +++ b/src/server/web/base.rs @@ -1,6 +1,6 @@ use std::fmt; -use crate::config::Config; +use crate::config::ServerConfig; use super::{ paths::{PathGraph, PathIndex, PathQueue}, @@ -27,7 +27,7 @@ pub struct Base { } impl Base { - pub fn new(config: &Config, tab: Tab) -> Self { + pub fn new(config: &ServerConfig, tab: Tab) -> Self { let tab = match tab { Tab::None => "", Tab::Index => "index", @@ -53,7 +53,7 @@ impl Base { Link(format!("{base}{to}")) } - pub fn link_with_config(config: &Config, to: P) -> Link { + pub fn link_with_config(config: &ServerConfig, to: P) -> Link { Self::link_with_base(&config.web_base, to) } diff --git a/src/server/web/pages/commit.rs b/src/server/web/pages/commit.rs index 22134ac..41202d2 100644 --- a/src/server/web/pages/commit.rs +++ b/src/server/web/pages/commit.rs @@ -8,7 +8,7 @@ use futures::TryStreamExt; use sqlx::SqlitePool; use crate::{ - config::Config, + config::ServerConfig, server::{ util, web::{ @@ -41,7 +41,7 @@ struct Page { pub async fn get_commit_by_hash( path: PathCommitByHash, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, ) -> somehow::Result { let base = Base::new(config, Tab::None); diff --git a/src/server/web/pages/graph.rs b/src/server/web/pages/graph.rs index dec4a12..d8fe438 100644 --- a/src/server/web/pages/graph.rs +++ b/src/server/web/pages/graph.rs @@ -12,7 +12,7 @@ use time::OffsetDateTime; use tracing::debug; use crate::{ - config::Config, + config::ServerConfig, server::web::{ base::{Base, Link, Tab}, paths::{PathGraph, PathGraphData}, @@ -112,7 +112,7 @@ struct Page { pub async fn get_graph( _path: PathGraph, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, ) -> somehow::Result { let metrics = diff --git a/src/server/web/pages/index.rs b/src/server/web/pages/index.rs index ed9176a..ef66a0c 100644 --- a/src/server/web/pages/index.rs +++ b/src/server/web/pages/index.rs @@ -4,7 +4,7 @@ use futures::TryStreamExt; use sqlx::SqlitePool; use crate::{ - config::Config, + config::ServerConfig, server::web::{ base::{Base, Tab}, link::LinkCommit, @@ -29,7 +29,7 @@ struct IndexTemplate { pub async fn get_index( _path: PathIndex, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, ) -> somehow::Result { let base = Base::new(config, Tab::Index); diff --git a/src/server/web/pages/queue.rs b/src/server/web/pages/queue.rs index 5fb3701..693ee18 100644 --- a/src/server/web/pages/queue.rs +++ b/src/server/web/pages/queue.rs @@ -13,7 +13,7 @@ use futures::TryStreamExt; use sqlx::SqlitePool; use crate::{ - config::Config, + config::ServerConfig, server::{ util, web::{ @@ -161,7 +161,7 @@ struct PageInner { pub async fn get_queue_inner( _path: PathQueueInner, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, State(workers): State>>, ) -> somehow::Result { @@ -182,7 +182,7 @@ struct Page { pub async fn get_queue( _path: PathQueue, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, State(workers): State>>, ) -> somehow::Result { @@ -210,7 +210,7 @@ struct PageDelete { pub async fn get_queue_delete( path: PathQueueDelete, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, ) -> somehow::Result { let base = Base::new(config, Tab::Queue); diff --git a/src/server/web/pages/run.rs b/src/server/web/pages/run.rs index d4c967a..5e6980b 100644 --- a/src/server/web/pages/run.rs +++ b/src/server/web/pages/run.rs @@ -8,7 +8,7 @@ use futures::TryStreamExt; use sqlx::SqlitePool; use crate::{ - config::Config, + config::ServerConfig, server::{ util, web::{ @@ -52,7 +52,7 @@ struct PageFinished { async fn from_finished_run( id: &str, - config: &'static Config, + config: &'static ServerConfig, db: &SqlitePool, ) -> somehow::Result> { let Some(run) = sqlx::query!( @@ -146,7 +146,7 @@ async fn from_finished_run( pub async fn get_run_by_id( path: PathRunById, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, ) -> somehow::Result { if let Some(response) = from_finished_run(&path.id, config, &db).await? { diff --git a/src/server/web/pages/worker.rs b/src/server/web/pages/worker.rs index a2cd835..6e34bef 100644 --- a/src/server/web/pages/worker.rs +++ b/src/server/web/pages/worker.rs @@ -9,7 +9,7 @@ use axum::{ use sqlx::SqlitePool; use crate::{ - config::Config, + config::ServerConfig, server::{ util, web::{ @@ -58,7 +58,7 @@ async fn status(status: &WorkerStatus, db: &SqlitePool, base: &Base) -> somehow: pub async fn get_worker_by_name( path: PathWorkerByName, - State(config): State<&'static Config>, + State(config): State<&'static ServerConfig>, State(db): State, State(workers): State>>, ) -> somehow::Result { diff --git a/src/server/workers.rs b/src/server/workers.rs index a350348..b5a22f4 100644 --- a/src/server/workers.rs +++ b/src/server/workers.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use time::OffsetDateTime; use crate::{ - config::Config, + config::ServerConfig, id, shared::{BenchMethod, Rfc3339Time, Run, UnfinishedRun, WorkerStatus}, }; @@ -28,12 +28,12 @@ impl WorkerInfo { } pub struct Workers { - config: &'static Config, + config: &'static ServerConfig, workers: HashMap, } impl Workers { - pub fn new(config: &'static Config) -> Self { + pub fn new(config: &'static ServerConfig) -> Self { Self { config, workers: HashMap::new(), @@ -43,7 +43,7 @@ impl Workers { pub fn clean(&mut self) -> &mut Self { let now = OffsetDateTime::now_utc(); self.workers - .retain(|_, v| now <= v.last_seen + self.config.web_worker_timeout); + .retain(|_, v| now <= v.last_seen + self.config.worker_timeout); self } diff --git a/src/worker.rs b/src/worker.rs index c5ad060..c32ce76 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -10,7 +10,7 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::{error, info, warn}; use crate::{ - config::Config, + config::WorkerConfig, id, shared::{FinishedRun, Run}, worker::server::Server, @@ -19,11 +19,11 @@ use crate::{ use self::run::RunInProgress; pub struct Worker { - config: &'static Config, + config: &'static WorkerConfig, } impl Worker { - pub fn new(config: &'static Config) -> Self { + pub fn new(config: &'static WorkerConfig) -> Self { Self { config } } @@ -33,7 +33,7 @@ impl Worker { let mut servers = self .config - .worker_servers + .servers .iter() .map(|(name, server_config)| Server { name: name.clone(), @@ -61,7 +61,7 @@ impl Worker { async fn single_server_mode(&self, server: Server) { loop { while self.perform_run(&server).await {} - tokio::time::sleep(self.config.worker_ping_delay).await; + tokio::time::sleep(self.config.ping).await; } } @@ -69,14 +69,14 @@ impl Worker { loop { for server in &servers { let batch_start = OffsetDateTime::now_utc(); - let batch_end = batch_start + self.config.worker_batch_duration; + let batch_end = batch_start + self.config.batch; while OffsetDateTime::now_utc() <= batch_end { if !self.perform_run(server).await { break; } } } - tokio::time::sleep(self.config.worker_ping_delay).await; + tokio::time::sleep(self.config.ping).await; } } @@ -98,7 +98,7 @@ impl Worker { let guard = server.status_lock.lock().await; *server.current_run.lock().unwrap() = None; while !self.submit_run(server, run.clone()).await { - tokio::time::sleep(self.config.worker_ping_delay).await; + tokio::time::sleep(self.config.ping).await; } drop(guard); diff --git a/src/worker/server.rs b/src/worker/server.rs index 921ba3a..efb0203 100644 --- a/src/worker/server.rs +++ b/src/worker/server.rs @@ -6,7 +6,7 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, warn}; use crate::{ - config::{Config, WorkerServerConfig}, + config::{WorkerConfig, WorkerServerConfig}, server::web::paths::{ PathApiWorkerBenchRepoByHashTreeTarGz, PathApiWorkerRepoByHashTreeTarGz, PathApiWorkerStatus, @@ -21,7 +21,7 @@ use super::run::RunInProgress; #[derive(Clone)] pub struct Server { pub name: String, - pub config: &'static Config, + pub config: &'static WorkerConfig, pub server_config: &'static WorkerServerConfig, pub secret: String, @@ -72,7 +72,7 @@ impl Server { let response = self .client .post(url) - .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) + .basic_auth(&self.config.name, Some(&self.server_config.token)) .json(&request) .send() .await? @@ -94,7 +94,7 @@ impl Server { let response = self .client .get(url) - .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) + .basic_auth(&self.config.name, Some(&self.server_config.token)) .send() .await?; @@ -113,7 +113,7 @@ impl Server { let response = self .client .get(url) - .basic_auth(&self.config.worker_name, Some(&self.server_config.token)) + .basic_auth(&self.config.name, Some(&self.server_config.token)) .send() .await?; @@ -139,7 +139,7 @@ impl Server { Err(e) => warn!("Error talking to server:\n{e:?}"), } - tokio::time::sleep(self.config.worker_ping_delay).await; + tokio::time::sleep(self.config.ping).await; } } }