From cc87171f621482e8473cb686382b976f15ebb1fa Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 11 Aug 2023 20:55:20 +0200 Subject: [PATCH] Download and unpack tars --- Cargo.lock | 20 +++++++++-- Cargo.toml | 3 +- src/worker.rs | 1 + src/worker/tree.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 src/worker/tree.rs diff --git a/Cargo.lock b/Cargo.lock index a4241b2..6e99fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2413,10 +2413,12 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -3113,6 +3115,7 @@ dependencies = [ "serde_repr", "sqlx", "tar", + "tempfile", "time", "tokio", "tokio-stream", @@ -3135,9 +3138,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand", @@ -3620,6 +3623,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" diff --git a/Cargo.toml b/Cargo.toml index aaaefda..ee67e19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0.181", features = ["derive"] } serde_repr = "0.1.16" sqlx = { version = "0.7.1", features = ["runtime-tokio", "sqlite", "time"] } tar = { version = "0.4.40", default-features = false } +tempfile = "3.7.1" tokio = { version = "1.29.1", features = ["full"] } tokio-stream = "0.1.14" toml = "0.7.6" @@ -37,7 +38,7 @@ features = ["max-performance-safe", "worktree-stream"] [dependencies.reqwest] version = "0.11.18" default-features = false -features = ["json", "rustls-tls-native-roots"] +features = ["json", "stream", "rustls-tls-native-roots"] [dependencies.time] version = "0.3.25" diff --git a/src/worker.rs b/src/worker.rs index b3a077b..5ce531f 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,5 +1,6 @@ mod coordinator; mod server; +mod tree; use std::sync::{Arc, Mutex}; diff --git a/src/worker/tree.rs b/src/worker/tree.rs new file mode 100644 index 0000000..e8daa9b --- /dev/null +++ b/src/worker/tree.rs @@ -0,0 +1,87 @@ +//! Download and unpack repo worktrees into temporary directories. + +use std::{io, path::PathBuf}; + +use axum::body::Bytes; +use flate2::read::GzDecoder; +use futures::{Stream, StreamExt}; +use tempfile::TempDir; +use tokio::{select, sync::mpsc}; +use tracing::debug; + +use crate::somehow; + +struct ReceiverReader { + rx: mpsc::Receiver, + rest: Bytes, +} + +impl ReceiverReader { + fn new(rx: mpsc::Receiver) -> Self { + Self { + rx, + rest: Bytes::new(), + } + } +} + +impl io::Read for ReceiverReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if self.rest.is_empty() { + if let Some(bytes) = self.rx.blocking_recv() { + self.rest = bytes; + } + } + + let mut slice = &*self.rest; + let result = slice.read(buf); + let _ = self.rest.split_to(self.rest.len() - slice.len()); + + result + } +} + +pub struct UnpackedTree { + pub hash: String, + pub dir: TempDir, +} + +impl UnpackedTree { + async fn stream( + mut stream: impl Stream> + Unpin, + tx: mpsc::Sender, + ) -> somehow::Result<()> { + while let Some(bytes) = stream.next().await { + tx.send(bytes?).await?; + } + Ok(()) + } + + fn unpack(rx: mpsc::Receiver, path: PathBuf) -> somehow::Result<()> { + let reader = ReceiverReader::new(rx); + let reader = GzDecoder::new(reader); + let mut reader = tar::Archive::new(reader); + reader.unpack(path)?; + Ok(()) + } + + async fn download(url: &str, hash: String) -> somehow::Result { + let dir = TempDir::new()?; + debug!( + "Downloading and unpacking {url} to {}", + dir.path().display() + ); + let (tx, rx) = mpsc::channel(1); + let stream = reqwest::get(url).await?.bytes_stream(); + + let path = dir.path().to_path_buf(); + let unpack_task = tokio::task::spawn_blocking(move || Self::unpack(rx, path)); + + select! { + r = Self::stream(stream, tx) => r?, + r = unpack_task => r??, + } + + Ok(Self { hash, dir }) + } +}