Stream repo and bench repo worktree tars

This commit is contained in:
Joscha 2023-08-11 14:20:15 +02:00
parent 6f4793bcf2
commit a9e08505bc
6 changed files with 215 additions and 28 deletions

View file

@ -1,15 +1,19 @@
mod auth;
mod stream;
use std::sync::{Arc, Mutex};
use askama_axum::{IntoResponse, Response};
use axum::{
extract::State,
body::StreamBody,
extract::{Path, State},
headers::{authorization::Basic, Authorization},
http::StatusCode,
routing::post,
http::{header, HeaderValue},
response::{IntoResponse, Response},
routing::{get, post},
Json, Router, TypedHeader,
};
use gix::{ObjectId, ThreadSafeRepository};
use sqlx::SqlitePool;
use time::OffsetDateTime;
use tracing::debug;
@ -18,18 +22,18 @@ use crate::{
config::Config,
server::{
workers::{WorkerInfo, Workers},
BenchRepo, Server,
BenchRepo, Repo, Server,
},
shared::{BenchMethod, ServerResponse, Work, WorkerRequest},
somehow,
};
async fn post_status(
auth: Option<TypedHeader<Authorization<Basic>>>,
State(config): State<&'static Config>,
State(db): State<SqlitePool>,
State(bench_repo): State<Option<BenchRepo>>,
State(workers): State<Arc<Mutex<Workers>>>,
auth: Option<TypedHeader<Authorization<Basic>>>,
Json(request): Json<WorkerRequest>,
) -> somehow::Result<Response> {
let name = match auth::authenticate(config, auth) {
@ -84,12 +88,70 @@ async fn post_status(
Ok(Json(ServerResponse { work, abort_work }).into_response())
}
fn stream_response(repo: Arc<ThreadSafeRepository>, id: ObjectId) -> impl IntoResponse {
(
[
(
header::CONTENT_TYPE,
HeaderValue::from_static("application/gzip"),
),
(
header::CONTENT_DISPOSITION,
HeaderValue::from_static("attachment; filename=\"tree.tar.gz\""),
),
],
StreamBody::new(stream::tar_and_gzip(repo, id)),
)
}
async fn get_repo(
State(config): State<&'static Config>,
State(repo): State<Option<Repo>>,
auth: Option<TypedHeader<Authorization<Basic>>>,
Path(hash): Path<String>,
) -> somehow::Result<Response> {
let _name = match auth::authenticate(config, auth) {
Ok(name) => name,
Err(response) => return Ok(response),
};
let Some(repo) = repo else {
return Ok(StatusCode::NOT_FOUND.into_response());
};
let id = hash.parse::<ObjectId>()?;
Ok(stream_response(repo.0, id).into_response())
}
async fn get_bench_repo(
State(config): State<&'static Config>,
State(bench_repo): State<Option<BenchRepo>>,
auth: Option<TypedHeader<Authorization<Basic>>>,
Path(hash): Path<String>,
) -> somehow::Result<Response> {
let _name = match auth::authenticate(config, auth) {
Ok(name) => name,
Err(response) => return Ok(response),
};
let Some(bench_repo) = bench_repo else {
return Ok(StatusCode::NOT_FOUND.into_response());
};
let id = hash.parse::<ObjectId>()?;
Ok(stream_response(bench_repo.0, id).into_response())
}
pub fn router(server: &Server) -> Router<Server> {
if server.repo.is_none() {
return Router::new();
}
// TODO Get repo tar
// TODO Get bench repo tar
Router::new().route("/api/worker/status", post(post_status))
Router::new()
.route("/api/worker/status", post(post_status))
.route("/api/worker/repo/:hash/tree.tar.gz", get(get_repo))
.route(
"/api/worker/bench_repo/:hash/tree.tar.gz",
get(get_bench_repo),
)
}

View file

@ -1,8 +1,9 @@
use askama_axum::IntoResponse;
//! Verify worker basic authentication headers.
use axum::{
headers::{authorization::Basic, Authorization},
http::{header, HeaderValue, StatusCode},
response::Response,
response::{IntoResponse, Response},
TypedHeader,
};
@ -36,7 +37,7 @@ pub fn authenticate(
StatusCode::UNAUTHORIZED,
[(
header::WWW_AUTHENTICATE,
HeaderValue::from_str("Basic realm=\"worker api\"").unwrap(),
HeaderValue::from_static("Basic realm=\"worker api\""),
)],
"invalid credentials",
)

View file

@ -0,0 +1,122 @@
//! Stream gzipped tar-ed repository worktrees.
use std::{
io::{self, BufWriter, Read},
sync::Arc,
};
use axum::{body::Bytes, BoxError};
use flate2::{write::GzEncoder, Compression};
use futures::TryStream;
use gix::{
bstr::ByteSlice, objs::tree::EntryMode, prelude::ObjectIdExt, worktree::stream::Entry,
ObjectId, ThreadSafeRepository,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};
const BLOCK_SIZE: usize = 1024 * 1024;
const COMPRESSION_LEVEL: Compression = Compression::fast();
struct SenderWriter(mpsc::Sender<Result<Bytes, BoxError>>);
impl SenderWriter {
fn new(tx: mpsc::Sender<Result<Bytes, BoxError>>) -> Self {
Self(tx)
}
fn finish(self) {}
}
impl io::Write for SenderWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.0.blocking_send(Ok(Bytes::copy_from_slice(buf))) {
Ok(()) => Ok(buf.len()),
Err(_) => Err(io::ErrorKind::ConnectionAborted.into()),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn write_entry(
mut entry: Entry<'_>,
writer: &mut tar::Builder<impl io::Write>,
) -> Result<(), BoxError> {
let mut header = tar::Header::new_gnu();
header.set_entry_type(match entry.mode {
EntryMode::Tree | EntryMode::Commit => tar::EntryType::Directory,
EntryMode::Blob | EntryMode::BlobExecutable => tar::EntryType::Regular,
EntryMode::Link => tar::EntryType::Symlink,
});
header.set_mode(match entry.mode {
EntryMode::BlobExecutable => 0o755, // rwxr-xr-x
_ => 0o644, // rw-r--r--
});
if entry.mode == EntryMode::Link {
let mut buf = vec![];
entry.read_to_end(&mut buf)?;
header.set_size(0);
let path = gix::path::from_bstr(entry.relative_path());
let target = gix::path::from_bstr(buf.as_bstr());
writer.append_link(&mut header, path, target)?;
} else {
header.set_size(entry.bytes_remaining().unwrap_or(0) as u64);
let path = gix::path::from_bstr(entry.relative_path()).to_path_buf();
writer.append_data(&mut header, path, entry)?;
}
Ok(())
}
fn write_worktree(
repo: Arc<ThreadSafeRepository>,
commit_id: ObjectId,
tx: mpsc::Sender<Result<Bytes, BoxError>>,
) -> Result<(), BoxError> {
let repo = repo.to_thread_local();
let tree_id = commit_id
.attach(&repo)
.object()?
.try_into_commit()?
.tree_id()?;
let (mut stream, _) = repo.worktree_stream(tree_id)?;
let writer = SenderWriter::new(tx);
let writer = BufWriter::with_capacity(BLOCK_SIZE, writer);
let writer = GzEncoder::new(writer, COMPRESSION_LEVEL);
let mut writer = tar::Builder::new(writer);
while let Some(entry) = stream.next_entry()? {
write_entry(entry, &mut writer)?;
}
writer
.into_inner()?
.finish()?
.into_inner()
.map_err(|e| e.into_error())?
.finish();
Ok(())
}
pub fn tar_and_gzip(
repo: Arc<ThreadSafeRepository>,
id: ObjectId,
) -> impl TryStream<Ok = Bytes, Error = BoxError> {
let (tx, rx) = mpsc::channel(1);
tokio::task::spawn_blocking(move || {
if let Err(e) = write_worktree(repo, id, tx.clone()) {
warn!("Error while streaming tar:\n{e:?}");
let _ = tx.blocking_send(Err(e));
} else {
debug!("Tar streamed successfully");
}
});
ReceiverStream::new(rx)
}