From 4f63b0250969fdf51c8e4f7633d05cb3eed4eeb2 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 12 Aug 2023 16:06:20 +0200 Subject: [PATCH] Fix and simplify tar downloads --- src/worker.rs | 2 ++ src/worker/tree.rs | 76 ++++++++++++++++++++-------------------------- 2 files changed, 35 insertions(+), 43 deletions(-) diff --git a/src/worker.rs b/src/worker.rs index 6c172c8..8724e89 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,3 +1,5 @@ +mod tree; + use tracing::error; use crate::config::Config; diff --git a/src/worker/tree.rs b/src/worker/tree.rs index c23f706..378e44e 100644 --- a/src/worker/tree.rs +++ b/src/worker/tree.rs @@ -5,9 +5,9 @@ use std::{io, path::PathBuf}; use axum::body::Bytes; use flate2::read::GzDecoder; use futures::{Stream, StreamExt}; +use reqwest::Response; use tempfile::TempDir; -use tokio::{select, sync::mpsc}; -use tracing::debug; +use tokio::sync::mpsc; use crate::somehow; @@ -41,47 +41,37 @@ impl io::Read for ReceiverReader { } } -pub struct UnpackedTree { - pub hash: String, - pub dir: TempDir, +async fn receive_bytes( + mut stream: impl Stream> + Unpin, + tx: mpsc::Sender, +) -> somehow::Result<()> { + while let Some(bytes) = stream.next().await { + tx.send(bytes?).await?; + } + Ok(()) } -impl UnpackedTree { - async fn stream( - mut stream: impl Stream> + Unpin, - tx: mpsc::Sender, - ) -> somehow::Result<()> { - while let Some(bytes) = stream.next().await { - tx.send(bytes?).await?; - } - Ok(()) - } - - fn unpack(rx: mpsc::Receiver, path: PathBuf) -> somehow::Result<()> { - let reader = ReceiverReader::new(rx); - let reader = GzDecoder::new(reader); - let mut reader = tar::Archive::new(reader); - reader.unpack(path)?; - Ok(()) - } - - pub async fn download(url: &str, hash: String) -> somehow::Result { - let dir = TempDir::new()?; - debug!( - "Downloading and unpacking {url} to {}", - dir.path().display() - ); - let (tx, rx) = mpsc::channel(1); - let stream = reqwest::get(url).await?.bytes_stream(); - - let path = dir.path().to_path_buf(); - let unpack_task = tokio::task::spawn_blocking(move || Self::unpack(rx, path)); - - select! { - r = Self::stream(stream, tx) => r?, - r = unpack_task => r??, - } - - Ok(Self { hash, dir }) - } +fn unpack_archive(rx: mpsc::Receiver, path: PathBuf) -> somehow::Result<()> { + let reader = ReceiverReader::new(rx); + let reader = GzDecoder::new(reader); + let mut reader = tar::Archive::new(reader); + reader.unpack(path)?; + Ok(()) +} + +pub async fn download(response: Response) -> somehow::Result { + let stream = response.error_for_status()?.bytes_stream(); + + let dir = TempDir::new()?; + let path = dir.path().to_path_buf(); + let (tx, rx) = mpsc::channel(1); + + let (received, unpacked) = tokio::join!( + receive_bytes(stream, tx), + tokio::task::spawn_blocking(move || unpack_archive(rx, path)), + ); + received?; + unpacked??; + + Ok(dir) }