diff --git a/Cargo.lock b/Cargo.lock index b4b3ce2..a4241b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -887,7 +887,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ce5c049b1afcae9bb9e10c0f6dd8eb1335e8647fb7fd34732a66133ca3b9886" dependencies = [ "gix-actor", - "gix-archive", "gix-attributes", "gix-commitgraph", "gix-config", @@ -945,19 +944,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "gix-archive" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc89a798842b519048e947339a9c9f3cfd8fb9c2d9b66b6ebcb0c3cc8fe5874d" -dependencies = [ - "bstr", - "gix-date", - "gix-object", - "gix-worktree-stream", - "thiserror", -] - [[package]] name = "gix-attributes" version = "0.16.0" @@ -3112,6 +3098,7 @@ dependencies = [ "axum", "clap", "directories", + "flate2", "futures", "gethostname", "gix", @@ -3125,8 +3112,10 @@ dependencies = [ "serde", "serde_repr", "sqlx", + "tar", "time", "tokio", + "tokio-stream", "toml", "tracing", "tracing-subscriber", @@ -3134,6 +3123,16 @@ dependencies = [ "walkdir", ] +[[package]] +name = "tar" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +dependencies = [ + "filetime", + "libc", +] + [[package]] name = "tempfile" version = "3.7.0" diff --git a/Cargo.toml b/Cargo.toml index 9a1875d..aaaefda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ askama_axum = "0.3.0" axum = { version = "0.6.19", features = ["macros", "headers"] } clap = { version = "4.3.19", features = ["derive", "deprecated"] } directories = "5.0.1" +flate2 = "1.0.26" futures = "0.3.28" gethostname = "0.4.3" humantime = "2.1.0" @@ -21,7 +22,9 @@ rust-embed = { version = "6.8.1", features = ["interpolate-folder-path"] } serde = { version = "1.0.181", features = ["derive"] } serde_repr = "0.1.16" sqlx = { version = "0.7.1", features = ["runtime-tokio", "sqlite", "time"] } +tar = { version = "0.4.40", default-features = false } tokio = { version = "1.29.1", features = ["full"] } +tokio-stream = "0.1.14" toml = "0.7.6" tracing = "0.1.37" tracing-subscriber = "0.3.17" @@ -29,7 +32,7 @@ tracing-subscriber = "0.3.17" [dependencies.gix] version = "0.51.0" default-features = false -features = ["max-performance-safe", "extras"] +features = ["max-performance-safe", "worktree-stream"] [dependencies.reqwest] version = "0.11.18" diff --git a/DESIGN.md b/DESIGN.md index 00514d0..7f1702e 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -85,9 +85,9 @@ like session ids. - If so, server may respond with a commit hash and bench method - Worker may include current work - If so, server may respond with request to abort the work -- GET `/api/worker/repo//tar` +- GET `/api/worker/repo//tree.tar.gz` - Get tar-ed commit from the server's repo, if any exists -- GET `/api/worker/bench-repo//tar` +- GET `/api/worker/bench-repo//tree.tar.gz` - Get tar-ed commit from the server's bench repo, if any exist ## CLI Args diff --git a/src/server/web/api.rs b/src/server/web/api.rs index 32dbcad..a79cdd9 100644 --- a/src/server/web/api.rs +++ b/src/server/web/api.rs @@ -1,15 +1,19 @@ mod auth; +mod stream; use std::sync::{Arc, Mutex}; -use askama_axum::{IntoResponse, Response}; use axum::{ - extract::State, + body::StreamBody, + extract::{Path, State}, headers::{authorization::Basic, Authorization}, http::StatusCode, - routing::post, + http::{header, HeaderValue}, + response::{IntoResponse, Response}, + routing::{get, post}, Json, Router, TypedHeader, }; +use gix::{ObjectId, ThreadSafeRepository}; use sqlx::SqlitePool; use time::OffsetDateTime; use tracing::debug; @@ -18,18 +22,18 @@ use crate::{ config::Config, server::{ workers::{WorkerInfo, Workers}, - BenchRepo, Server, + BenchRepo, Repo, Server, }, shared::{BenchMethod, ServerResponse, Work, WorkerRequest}, somehow, }; async fn post_status( - auth: Option>>, State(config): State<&'static Config>, State(db): State, State(bench_repo): State>, State(workers): State>>, + auth: Option>>, Json(request): Json, ) -> somehow::Result { let name = match auth::authenticate(config, auth) { @@ -84,12 +88,70 @@ async fn post_status( Ok(Json(ServerResponse { work, abort_work }).into_response()) } +fn stream_response(repo: Arc, id: ObjectId) -> impl IntoResponse { + ( + [ + ( + header::CONTENT_TYPE, + HeaderValue::from_static("application/gzip"), + ), + ( + header::CONTENT_DISPOSITION, + HeaderValue::from_static("attachment; filename=\"tree.tar.gz\""), + ), + ], + StreamBody::new(stream::tar_and_gzip(repo, id)), + ) +} + +async fn get_repo( + State(config): State<&'static Config>, + State(repo): State>, + auth: Option>>, + Path(hash): Path, +) -> somehow::Result { + let _name = match auth::authenticate(config, auth) { + Ok(name) => name, + Err(response) => return Ok(response), + }; + + let Some(repo) = repo else { + return Ok(StatusCode::NOT_FOUND.into_response()); + }; + + let id = hash.parse::()?; + Ok(stream_response(repo.0, id).into_response()) +} + +async fn get_bench_repo( + State(config): State<&'static Config>, + State(bench_repo): State>, + auth: Option>>, + Path(hash): Path, +) -> somehow::Result { + let _name = match auth::authenticate(config, auth) { + Ok(name) => name, + Err(response) => return Ok(response), + }; + + let Some(bench_repo) = bench_repo else { + return Ok(StatusCode::NOT_FOUND.into_response()); + }; + + let id = hash.parse::()?; + Ok(stream_response(bench_repo.0, id).into_response()) +} + pub fn router(server: &Server) -> Router { if server.repo.is_none() { return Router::new(); } - // TODO Get repo tar - // TODO Get bench repo tar - Router::new().route("/api/worker/status", post(post_status)) + Router::new() + .route("/api/worker/status", post(post_status)) + .route("/api/worker/repo/:hash/tree.tar.gz", get(get_repo)) + .route( + "/api/worker/bench_repo/:hash/tree.tar.gz", + get(get_bench_repo), + ) } diff --git a/src/server/web/api/auth.rs b/src/server/web/api/auth.rs index 9e936ca..425387a 100644 --- a/src/server/web/api/auth.rs +++ b/src/server/web/api/auth.rs @@ -1,8 +1,9 @@ -use askama_axum::IntoResponse; +//! Verify worker basic authentication headers. + use axum::{ headers::{authorization::Basic, Authorization}, http::{header, HeaderValue, StatusCode}, - response::Response, + response::{IntoResponse, Response}, TypedHeader, }; @@ -36,7 +37,7 @@ pub fn authenticate( StatusCode::UNAUTHORIZED, [( header::WWW_AUTHENTICATE, - HeaderValue::from_str("Basic realm=\"worker api\"").unwrap(), + HeaderValue::from_static("Basic realm=\"worker api\""), )], "invalid credentials", ) diff --git a/src/server/web/api/stream.rs b/src/server/web/api/stream.rs new file mode 100644 index 0000000..f0da01b --- /dev/null +++ b/src/server/web/api/stream.rs @@ -0,0 +1,122 @@ +//! Stream gzipped tar-ed repository worktrees. + +use std::{ + io::{self, BufWriter, Read}, + sync::Arc, +}; + +use axum::{body::Bytes, BoxError}; +use flate2::{write::GzEncoder, Compression}; +use futures::TryStream; +use gix::{ + bstr::ByteSlice, objs::tree::EntryMode, prelude::ObjectIdExt, worktree::stream::Entry, + ObjectId, ThreadSafeRepository, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{debug, warn}; + +const BLOCK_SIZE: usize = 1024 * 1024; +const COMPRESSION_LEVEL: Compression = Compression::fast(); + +struct SenderWriter(mpsc::Sender>); + +impl SenderWriter { + fn new(tx: mpsc::Sender>) -> Self { + Self(tx) + } + + fn finish(self) {} +} + +impl io::Write for SenderWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self.0.blocking_send(Ok(Bytes::copy_from_slice(buf))) { + Ok(()) => Ok(buf.len()), + Err(_) => Err(io::ErrorKind::ConnectionAborted.into()), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +fn write_entry( + mut entry: Entry<'_>, + writer: &mut tar::Builder, +) -> Result<(), BoxError> { + let mut header = tar::Header::new_gnu(); + header.set_entry_type(match entry.mode { + EntryMode::Tree | EntryMode::Commit => tar::EntryType::Directory, + EntryMode::Blob | EntryMode::BlobExecutable => tar::EntryType::Regular, + EntryMode::Link => tar::EntryType::Symlink, + }); + header.set_mode(match entry.mode { + EntryMode::BlobExecutable => 0o755, // rwxr-xr-x + _ => 0o644, // rw-r--r-- + }); + + if entry.mode == EntryMode::Link { + let mut buf = vec![]; + entry.read_to_end(&mut buf)?; + header.set_size(0); + let path = gix::path::from_bstr(entry.relative_path()); + let target = gix::path::from_bstr(buf.as_bstr()); + writer.append_link(&mut header, path, target)?; + } else { + header.set_size(entry.bytes_remaining().unwrap_or(0) as u64); + let path = gix::path::from_bstr(entry.relative_path()).to_path_buf(); + writer.append_data(&mut header, path, entry)?; + } + + Ok(()) +} + +fn write_worktree( + repo: Arc, + commit_id: ObjectId, + tx: mpsc::Sender>, +) -> Result<(), BoxError> { + let repo = repo.to_thread_local(); + let tree_id = commit_id + .attach(&repo) + .object()? + .try_into_commit()? + .tree_id()?; + let (mut stream, _) = repo.worktree_stream(tree_id)?; + + let writer = SenderWriter::new(tx); + let writer = BufWriter::with_capacity(BLOCK_SIZE, writer); + let writer = GzEncoder::new(writer, COMPRESSION_LEVEL); + let mut writer = tar::Builder::new(writer); + + while let Some(entry) = stream.next_entry()? { + write_entry(entry, &mut writer)?; + } + + writer + .into_inner()? + .finish()? + .into_inner() + .map_err(|e| e.into_error())? + .finish(); + + Ok(()) +} + +pub fn tar_and_gzip( + repo: Arc, + id: ObjectId, +) -> impl TryStream { + let (tx, rx) = mpsc::channel(1); + tokio::task::spawn_blocking(move || { + if let Err(e) = write_worktree(repo, id, tx.clone()) { + warn!("Error while streaming tar:\n{e:?}"); + let _ = tx.blocking_send(Err(e)); + } else { + debug!("Tar streamed successfully"); + } + }); + ReceiverStream::new(rx) +}