Fix and simplify tar downloads

This commit is contained in:
Joscha 2023-08-12 16:06:20 +02:00
parent 81328fcf04
commit 4f63b02509
2 changed files with 35 additions and 43 deletions

View file

@ -1,3 +1,5 @@
mod tree;
use tracing::error; use tracing::error;
use crate::config::Config; use crate::config::Config;

View file

@ -5,9 +5,9 @@ use std::{io, path::PathBuf};
use axum::body::Bytes; use axum::body::Bytes;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use reqwest::Response;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::{select, sync::mpsc}; use tokio::sync::mpsc;
use tracing::debug;
use crate::somehow; use crate::somehow;
@ -41,47 +41,37 @@ impl io::Read for ReceiverReader {
} }
} }
pub struct UnpackedTree { async fn receive_bytes(
pub hash: String, mut stream: impl Stream<Item = reqwest::Result<Bytes>> + Unpin,
pub dir: TempDir, tx: mpsc::Sender<Bytes>,
) -> somehow::Result<()> {
while let Some(bytes) = stream.next().await {
tx.send(bytes?).await?;
}
Ok(())
} }
impl UnpackedTree { fn unpack_archive(rx: mpsc::Receiver<Bytes>, path: PathBuf) -> somehow::Result<()> {
async fn stream( let reader = ReceiverReader::new(rx);
mut stream: impl Stream<Item = reqwest::Result<Bytes>> + Unpin, let reader = GzDecoder::new(reader);
tx: mpsc::Sender<Bytes>, let mut reader = tar::Archive::new(reader);
) -> somehow::Result<()> { reader.unpack(path)?;
while let Some(bytes) = stream.next().await { Ok(())
tx.send(bytes?).await?; }
}
Ok(()) pub async fn download(response: Response) -> somehow::Result<TempDir> {
} let stream = response.error_for_status()?.bytes_stream();
fn unpack(rx: mpsc::Receiver<Bytes>, path: PathBuf) -> somehow::Result<()> { let dir = TempDir::new()?;
let reader = ReceiverReader::new(rx); let path = dir.path().to_path_buf();
let reader = GzDecoder::new(reader); let (tx, rx) = mpsc::channel(1);
let mut reader = tar::Archive::new(reader);
reader.unpack(path)?; let (received, unpacked) = tokio::join!(
Ok(()) receive_bytes(stream, tx),
} tokio::task::spawn_blocking(move || unpack_archive(rx, path)),
);
pub async fn download(url: &str, hash: String) -> somehow::Result<Self> { received?;
let dir = TempDir::new()?; unpacked??;
debug!(
"Downloading and unpacking {url} to {}", Ok(dir)
dir.path().display()
);
let (tx, rx) = mpsc::channel(1);
let stream = reqwest::get(url).await?.bytes_stream();
let path = dir.path().to_path_buf();
let unpack_task = tokio::task::spawn_blocking(move || Self::unpack(rx, path));
select! {
r = Self::stream(stream, tx) => r?,
r = unpack_task => r??,
}
Ok(Self { hash, dir })
}
} }