Add admin button to update repo

This commit is contained in:
Joscha 2023-08-17 18:19:41 +02:00
parent 6cf7a0b586
commit 4f2b0a0b88
9 changed files with 69 additions and 15 deletions

View file

@ -176,10 +176,10 @@ async fn run() -> somehow::Result<()> {
tokio::task::spawn(launch_local_workers(config, command.local_worker)); tokio::task::spawn(launch_local_workers(config, command.local_worker));
} }
let server = Server::new(&config.server, command).await?; let (server, recurring_rx) = Server::new(&config.server, command).await?;
select! { select! {
_ = wait_for_signal() => {} _ = wait_for_signal() => {}
_ = server.run() => {} _ = server.run(recurring_rx) => {}
} }
select! { select! {

View file

@ -18,7 +18,7 @@ use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous}, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous},
SqlitePool, SqlitePool,
}; };
use tokio::select; use tokio::{select, sync::mpsc};
use crate::{args::ServerCommand, config::ServerConfig, somehow}; use crate::{args::ServerCommand, config::ServerConfig, somehow};
@ -117,13 +117,14 @@ pub struct Server {
repo: Option<Repo>, repo: Option<Repo>,
bench_repo: Option<BenchRepo>, bench_repo: Option<BenchRepo>,
workers: Arc<Mutex<Workers>>, workers: Arc<Mutex<Workers>>,
recurring_tx: Arc<mpsc::UnboundedSender<()>>,
} }
impl Server { impl Server {
pub async fn new( pub async fn new(
config: &'static ServerConfig, config: &'static ServerConfig,
command: ServerCommand, command: ServerCommand,
) -> somehow::Result<Self> { ) -> somehow::Result<(Self, mpsc::UnboundedReceiver<()>)> {
let repo = if let Some(path) = command.repo.as_ref() { let repo = if let Some(path) = command.repo.as_ref() {
let repo = open_repo(path, &config.repo_fetch_url, &config.repo_fetch_refspecs)?; let repo = open_repo(path, &config.repo_fetch_url, &config.repo_fetch_refspecs)?;
Some(Repo(Arc::new(repo))) Some(Repo(Arc::new(repo)))
@ -138,20 +139,24 @@ impl Server {
None None
}; };
Ok(Self { let (recurring_tx, recurring_rx) = mpsc::unbounded_channel();
let server = Self {
config, config,
db: open_db(&command.db).await?, db: open_db(&command.db).await?,
repo, repo,
bench_repo, bench_repo,
workers: Arc::new(Mutex::new(Workers::new(config))), workers: Arc::new(Mutex::new(Workers::new(config))),
}) recurring_tx: Arc::new(recurring_tx),
};
Ok((server, recurring_rx))
} }
pub async fn run(&self) -> somehow::Result<()> { pub async fn run(&self, recurring_rx: mpsc::UnboundedReceiver<()>) -> somehow::Result<()> {
if let Some(repo) = self.repo.clone() { if let Some(repo) = self.repo.clone() {
select! { select! {
e = web::run(self.clone()) => e, e = web::run(self.clone()) => e,
() = recurring::run(self.clone(), repo) => Ok(()), () = recurring::run(self.clone(), repo, recurring_rx) => Ok(()),
} }
} else { } else {
web::run(self.clone()).await web::run(self.clone()).await

View file

@ -4,14 +4,17 @@ mod fetch;
mod queue; mod queue;
mod repo; mod repo;
use tokio::sync::mpsc;
use super::{Repo, Server}; use super::{Repo, Server};
pub(super) async fn run(server: Server, repo: Repo) { pub(super) async fn run(server: Server, repo: Repo, mut recurring_rx: mpsc::UnboundedReceiver<()>) {
loop { loop {
fetch::update(server.config, repo.clone()).await; fetch::update(server.config, repo.clone()).await;
repo::update(&server.db, repo.clone()).await; repo::update(&server.db, repo.clone()).await;
queue::update(&server.db).await; queue::update(&server.db).await;
tokio::time::sleep(server.config.repo_update).await; let _ = tokio::time::timeout(server.config.repo_update, recurring_rx.recv()).await;
while let Ok(()) = recurring_rx.try_recv() {}
} }
} }

View file

@ -12,9 +12,12 @@ use axum_extra::routing::RouterExt;
use crate::somehow; use crate::somehow;
use self::{ use self::{
admin::queue::{ admin::{
post_admin_queue_add, post_admin_queue_decrease, post_admin_queue_delete, queue::{
post_admin_queue_increase, post_admin_queue_add, post_admin_queue_decrease, post_admin_queue_delete,
post_admin_queue_increase,
},
repo::post_admin_repo_update,
}, },
api::worker::{ api::worker::{
get_api_worker_bench_repo_by_hash_tree_tar_gz, get_api_worker_repo_by_hash_tree_tar_gz, get_api_worker_bench_repo_by_hash_tree_tar_gz, get_api_worker_repo_by_hash_tree_tar_gz,
@ -55,6 +58,7 @@ pub async fn run(server: Server) -> somehow::Result<()> {
.typed_post(post_admin_queue_decrease) .typed_post(post_admin_queue_decrease)
.typed_post(post_admin_queue_delete) .typed_post(post_admin_queue_delete)
.typed_post(post_admin_queue_increase) .typed_post(post_admin_queue_increase)
.typed_post(post_admin_repo_update)
.merge(post_api_worker_status) .merge(post_api_worker_status)
.fallback(get(r#static::static_handler)) .fallback(get(r#static::static_handler))
.with_state(server.clone()); .with_state(server.clone());

View file

@ -1 +1,2 @@
pub mod queue; pub mod queue;
pub mod repo;

View file

@ -0,0 +1,29 @@
use std::sync::Arc;
use axum::{
extract::State,
response::{IntoResponse, Redirect},
};
use log::info;
use tokio::sync::mpsc;
use crate::{
config::ServerConfig,
server::web::{
base::Base,
paths::{PathAdminRepoUpdate, PathIndex},
},
somehow,
};
pub async fn post_admin_repo_update(
_path: PathAdminRepoUpdate,
State(config): State<&'static ServerConfig>,
State(recurring_tx): State<Arc<mpsc::UnboundedSender<()>>>,
) -> somehow::Result<impl IntoResponse> {
let _ = recurring_tx.send(());
info!("Admin updated repo");
let link = Base::link_with_config(config, PathIndex {});
Ok(Redirect::to(&link.to_string()))
}

View file

@ -6,9 +6,9 @@ use sqlx::SqlitePool;
use crate::{ use crate::{
config::ServerConfig, config::ServerConfig,
server::web::{ server::web::{
base::{Base, Tab}, base::{Base, Link, Tab},
link::LinkCommit, link::LinkCommit,
paths::PathIndex, paths::{PathAdminRepoUpdate, PathIndex},
}, },
somehow, somehow,
}; };
@ -22,7 +22,9 @@ struct Ref {
#[derive(Template)] #[derive(Template)]
#[template(path = "pages/index.html")] #[template(path = "pages/index.html")]
struct IndexTemplate { struct IndexTemplate {
link_admin_repo_update: Link,
base: Base, base: Base,
tracked_refs: Vec<Ref>, tracked_refs: Vec<Ref>,
untracked_refs: Vec<Ref>, untracked_refs: Vec<Ref>,
} }
@ -62,7 +64,9 @@ pub async fn get_index(
} }
Ok(IndexTemplate { Ok(IndexTemplate {
link_admin_repo_update: base.link(PathAdminRepoUpdate {}),
base: Base::new(config, Tab::Index), base: Base::new(config, Tab::Index),
tracked_refs, tracked_refs,
untracked_refs, untracked_refs,
}) })

View file

@ -53,6 +53,10 @@ pub struct PathWorkerByName {
// Admin actions // // Admin actions //
/////////////////// ///////////////////
#[derive(Deserialize, TypedPath)]
#[typed_path("/admin/repo/update")]
pub struct PathAdminRepoUpdate {}
#[derive(Deserialize, TypedPath)] #[derive(Deserialize, TypedPath)]
#[typed_path("/admin/queue/add")] #[typed_path("/admin/queue/add")]
pub struct PathAdminQueueAdd {} pub struct PathAdminQueueAdd {}

View file

@ -6,6 +6,10 @@
<h2>Refs</h2> <h2>Refs</h2>
<form method="post" action="{{ link_admin_repo_update }}">
<button>Update</button>
</form>
<details open> <details open>
<summary>Tracked ({{ tracked_refs.len() }})</summary> <summary>Tracked ({{ tracked_refs.len() }})</summary>
<dl> <dl>