From 3dd7c7d57ac840595dc8f06a3e9e5c7eff5b0023 Mon Sep 17 00:00:00 2001 From: Jan Lukas Gernert Date: Sun, 16 Apr 2023 18:10:43 +0200 Subject: [PATCH] tmp: calc download size & print progress --- article_scraper/Cargo.toml | 3 ++- article_scraper/src/images/mod.rs | 37 ++++++++++++++++++++++++--- article_scraper/src/images/request.rs | 29 +++++++++++++-------- article_scraper_cli/Cargo.toml | 2 +- 4 files changed, 55 insertions(+), 16 deletions(-) diff --git a/article_scraper/Cargo.toml b/article_scraper/Cargo.toml index b46936d..30da478 100644 --- a/article_scraper/Cargo.toml +++ b/article_scraper/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://gitlab.com/news-flash/article_scraper" [dependencies] thiserror = "1.0" libxml = "0.3" -reqwest = { version = "0.11", features = ["json", "native-tls", "gzip", "brotli"] } +reqwest = { version = "0.11", features = ["json", "native-tls", "gzip", "brotli", "stream"] } tokio = { version = "1.27", features = ["macros", "fs", "io-util"] } url = "2.3" regex = "1.7" @@ -23,6 +23,7 @@ rust-embed="6.6" once_cell = "1.17" escaper = "0.1" futures = "0.3" +byte-unit = "4.0" [dev-dependencies] env_logger = "0.10" \ No newline at end of file diff --git a/article_scraper/src/images/mod.rs b/article_scraper/src/images/mod.rs index 2a422c9..dc2d1b5 100644 --- a/article_scraper/src/images/mod.rs +++ b/article_scraper/src/images/mod.rs @@ -2,12 +2,14 @@ pub use self::error::ImageDownloadError; use self::request::ImageRequest; use crate::util::Util; use base64::Engine; +use byte_unit::Byte; use image::ImageOutputFormat; use libxml::parser::Parser; use libxml::tree::{Document, Node, SaveOptions}; use libxml::xpath::Context; use reqwest::{Client, Url}; use std::io::Cursor; +use tokio::sync::mpsc::{self, Sender}; mod error; mod request; @@ -72,12 +74,40 @@ impl ImageDownloader { .filter_map(|r| r.ok()) .collect::>(); + let size = res + .iter() + .map(|(req, parent_req)| { + req.content_length() + parent_req.as_ref().map(|r| r.content_length()).unwrap_or(0) + }) + .sum::(); + + let (tx, mut rx) = mpsc::channel::(2); + let mut download_futures = Vec::new(); for (request, parent_request) in res { - download_futures.push(self.download_and_replace_image(request, parent_request)); + download_futures.push(self.download_and_replace_image( + request, + parent_request, + tx.clone(), + )); } + tokio::spawn(async move { + let mut received = 0_usize; + let size = Byte::from_bytes(size as u128); + let adjusted_size = size.get_appropriate_unit(true); + println!("downloading {adjusted_size}"); + + while let Some(i) = rx.recv().await { + received += i; + + let received_bytes = Byte::from_bytes(received as u128); + let received_adjusted = received_bytes.get_appropriate_unit(true); + println!("received {received_adjusted} / {adjusted_size}"); + } + }); + _ = futures::future::join_all(download_futures).await; Ok(()) @@ -118,13 +148,14 @@ impl ImageDownloader { &self, mut request: ImageRequest, mut parent_request: Option, + tx: Sender, ) -> Result<(), ImageDownloadError> { - let mut image = request.download().await?; + let mut image = request.download(&tx).await?; let mut parent_image: Option> = None; if let Some(parent_request) = parent_request.as_mut() { if parent_request.content_length() > request.content_length() { - parent_image = parent_request.download().await.ok(); + parent_image = parent_request.download(&tx).await.ok(); } } diff --git a/article_scraper/src/images/request.rs b/article_scraper/src/images/request.rs index ee7cbfd..b68c27f 100644 --- a/article_scraper/src/images/request.rs +++ b/article_scraper/src/images/request.rs @@ -1,12 +1,14 @@ +use futures::StreamExt; use libxml::tree::Node; use reqwest::{header::CONTENT_TYPE, Client, Response, Url}; +use tokio::sync::mpsc::Sender; use super::ImageDownloadError; pub struct ImageRequest { node: Node, http_response: Option, - content_length: u64, + content_length: usize, content_type: String, } @@ -33,14 +35,19 @@ impl ImageRequest { }) } - pub async fn download(&mut self) -> Result, ImageDownloadError> { + pub async fn download(&mut self, tx: &Sender) -> Result, ImageDownloadError> { if let Some(http_response) = self.http_response.take() { - let result = http_response - .bytes() - .await - .map_err(|_| ImageDownloadError::Http)? - .as_ref() - .to_vec(); + let mut stream = http_response.bytes_stream(); + + let mut result = Vec::with_capacity(self.content_length); + while let Some(item) = stream.next().await { + let chunk = item.map_err(|_| ImageDownloadError::Http)?; + _ = tx.send(chunk.len()).await; + for byte in chunk { + result.push(byte); + } + } + Ok(result) } else { log::warn!("imagerequest already consumed"); @@ -52,7 +59,7 @@ impl ImageRequest { &self.content_type } - pub fn content_length(&self) -> u64 { + pub fn content_length(&self) -> usize { self.content_length } @@ -60,7 +67,7 @@ impl ImageRequest { _ = self.node.set_property(prop_name, data); } - fn get_content_length(response: &Response) -> Result { + fn get_content_length(response: &Response) -> Result { let status_code = response.status(); if !status_code.is_success() { @@ -72,7 +79,7 @@ impl ImageRequest { .headers() .get(reqwest::header::CONTENT_LENGTH) .and_then(|content_length| content_length.to_str().ok()) - .and_then(|content_length| content_length.parse::().ok()) + .and_then(|content_length| content_length.parse::().ok()) .ok_or(ImageDownloadError::ContentLength) } diff --git a/article_scraper_cli/Cargo.toml b/article_scraper_cli/Cargo.toml index cccc64b..ff77545 100644 --- a/article_scraper_cli/Cargo.toml +++ b/article_scraper_cli/Cargo.toml @@ -13,5 +13,5 @@ clap = { version = "4.2", features = [ "derive" ] } simplelog = "0.12" log = "0.4" url = "2.3" -reqwest = { version = "0.11", features = ["json", "native-tls", "gzip", "brotli"] } +reqwest = { version = "0.11", features = ["json", "native-tls", "gzip", "brotli", "stream"] } tokio = { version = "1.27", features = ["macros", "fs", "io-util", "rt-multi-thread" ] } \ No newline at end of file