Skip to content

Commit

Permalink
* Allow a callback for download polling
Browse files Browse the repository at this point in the history
* Add documentation
* Bump version
  • Loading branch information
Hainish committed Dec 21, 2022
1 parent 853af02 commit f6016a9
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-dl-stream-to-disk"
version = "0.3.0"
version = "1.0.0"
authors = ["William Budington <[email protected]>"]
edition = "2018"
license = "MIT"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ A micro-library for downloading from a URL and streaming it directly to the disk

```rust
use std::path::Path;
use tokio_dl_stream_to_disk::AsyncDownload;

#[tokio::main]
async fn main() {
if tokio_dl_stream_to_disk::download("https://bit.ly/3yWXSOW", &Path::new("/tmp"), "5mb_test.bin").await.is_ok() {
if AsyncDownload::new("https://bit.ly/3yWXSOW", &Path::new("/tmp"), "5mb_test.bin").download(&None).await.is_ok() {
println!("File downloaded successfully!");
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub enum ErrorKind {
FileExists,
DirectoryMissing,
PermissionDenied,
InvalidResponse,
IO(IOError),
Other(Box<dyn StdError>),
}
Expand All @@ -32,6 +33,7 @@ impl Error {
ErrorKind::FileExists => None,
ErrorKind::DirectoryMissing => None,
ErrorKind::PermissionDenied => None,
ErrorKind::InvalidResponse => None,
ErrorKind::IO(err) => Some(err),
ErrorKind::Other(_) => None,
}
Expand All @@ -42,6 +44,7 @@ impl Error {
ErrorKind::FileExists => None,
ErrorKind::DirectoryMissing => None,
ErrorKind::PermissionDenied => None,
ErrorKind::InvalidResponse => None,
ErrorKind::IO(_) => None,
ErrorKind::Other(err) => Some(err),
}
Expand Down Expand Up @@ -76,6 +79,7 @@ impl fmt::Display for Error {
ErrorKind::FileExists => write!(f, "File already exists"),
ErrorKind::DirectoryMissing => write!(f, "Destination path provided is not a valid directory"),
ErrorKind::PermissionDenied => write!(f, "Cannot create file: permission denied"),
ErrorKind::InvalidResponse => write!(f, "Invalid response from the remote host"),
ErrorKind::IO(err) => err.fmt(f),
ErrorKind::Other(err) => err.fmt(f),
}
Expand Down
189 changes: 132 additions & 57 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@
//!
//! ```rust
//! use std::path::Path;
//! use tokio_dl_stream_to_disk::AsyncDownload;
//!
//! #[tokio::main]
//! async fn main() {
//! if tokio_dl_stream_to_disk::download("https://bit.ly/3yWXSOW", &Path::new("/tmp"), "5mb_test.bin").await.is_ok() {
//! if AsyncDownload::new("https://bit.ly/3yWXSOW", &Path::new("/tmp"), "5mb_test.bin").download(&None).await.is_ok() {
//! println!("File downloaded successfully!");
//! }
//! }
//! ```
pub mod error;

use std::convert::TryInto;
use std::error::Error;
use std::io::{Error as IOError, ErrorKind};
use std::path::Path;
use std::path::{Path, PathBuf};

use bytes::Bytes;
use futures_util::stream::Stream;
Expand All @@ -30,73 +32,146 @@ use tokio_util::io::StreamReader;
use crate::error::{Error as TDSTDError, ErrorKind as TDSTDErrorKind};

type S = dyn Stream<Item = Result<Bytes, IOError>> + Unpin;
async fn http_stream(url: &str) -> Result<Box<S>, Box<dyn Error>> {
Ok(Box::new(reqwest::get(url)
.await?
.error_for_status()?
.bytes_stream()
.map(|result| result.map_err(|e| IOError::new(ErrorKind::Other, e)))))

/// The AsyncDownload struct allows you to stream the contents of a download to the disk.
pub struct AsyncDownload {
url: String,
dst_path: PathBuf,
fname: String,
length: Option<u64>,
response_stream: Option<Box<S>>
}

pub async fn download<S: Into<String>>(url: S, dst_path: &Path, fname: S) -> Result<(), TDSTDError> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
impl AsyncDownload {
/// Returns an AsyncDownload struct with the url, destination on disk and filename specified.
///
/// # Arguments
///
/// * `url` - A string type containing the URL you want to download the contents of
/// * `dst_path` - A PathBuf type containing the destination path
/// * `fname` - A string type containing the filename of the download
pub fn new(url: &str, dst_path: &Path, fname: &str) -> Self {
Self {
url: String::from(url),
dst_path: PathBuf::from(dst_path),
fname: String::from(fname),
length: None,
response_stream: None
}
}

let fname = dst_path.join(fname.into());
if fname.is_file() {
return Err(TDSTDError::new(TDSTDErrorKind::FileExists));
/// Returns the length of the download in bytes. This should be called after calling [`get`]
/// or [`download`].
pub fn length(&self) -> Option<u64> {
self.length
}

if dst_path.is_dir() {
let mut http_async_reader = {
let http_stream = http_stream(&url.into()).await?;
StreamReader::new(http_stream)
};

let mut dest = tokio::fs::File::create(fname).await?;
let mut buf = [0; 8 * 1024];
loop {
let num_bytes = http_async_reader.read(&mut buf).await?;
if num_bytes > 0 {
dest.write(&mut buf[0..num_bytes]).await?;
} else {
break;
}
}
/// Get the download URL, but do not download it. If successful, returns an `AsyncDownload`
/// object with a response stream, which you can then call [`download`] on. After this, the
/// length of the download should also be known and you can call [`length`] on it.
pub async fn get(mut self) -> Result<AsyncDownload, Box<dyn Error>> {
self.get_non_consumable().await?;
Ok(self)
}

async fn get_non_consumable(&mut self) -> Result<(), Box<dyn Error>> {
let response = reqwest::get(self.url.clone())
.await?;
let content_length = response.headers().get("content-length").map_or(None,
|l| {
match l.to_str() {
Err(_) => None,
Ok(l_str) => {
l_str.parse::<u64>().ok()
}
}
});
self.response_stream = Some(Box::new(response
.error_for_status()?
.bytes_stream()
.map(|result| result.map_err(|e| IOError::new(ErrorKind::Other, e)))));
self.length = content_length;
Ok(())
} else {
Err(TDSTDError::new(TDSTDErrorKind::DirectoryMissing))
}
}

#[cfg(feature="sha256sum")]
pub async fn download_and_return_sha256sum<S: Into<String>>(url: S, dst_path: &Path, fname: S) -> Result<Vec<u8>, TDSTDError> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// Initiate the download and return a result. Specify an optional callback.
///
/// Arguments:
/// * `cb` - An optional callback for reporting information about the download asynchronously.
/// The callback takes the position of the current download, in bytes.
pub async fn download(&mut self, cb: &Option<Box<dyn Fn(u64) -> ()>>) -> Result<(), TDSTDError> {
if self.response_stream.is_none() {
self.get_non_consumable().await.map_err(|_| TDSTDError::new(TDSTDErrorKind::InvalidResponse))?;
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};

let fname = dst_path.join(fname.into());
if fname.is_file() {
return Err(TDSTDError::new(TDSTDErrorKind::FileExists));
let fname = self.dst_path.join(self.fname.clone());
if fname.is_file() {
return Err(TDSTDError::new(TDSTDErrorKind::FileExists));
}

if self.dst_path.is_dir() {
let mut http_async_reader = StreamReader::new(self.response_stream.take().unwrap());

let mut dest = tokio::fs::File::create(fname).await?;
let mut buf = [0; 8 * 1024];
let mut num_bytes_total = 0;
loop {
let num_bytes = http_async_reader.read(&mut buf).await?;
if let Some(ref cb) = cb {
num_bytes_total += num_bytes;
cb(num_bytes_total.try_into().unwrap());
}
if num_bytes > 0 {
dest.write(&mut buf[0..num_bytes]).await?;
} else {
break;
}
}
Ok(())
} else {
Err(TDSTDError::new(TDSTDErrorKind::DirectoryMissing))
}
}

if dst_path.is_dir() {
let mut http_async_reader = {
let http_stream = http_stream(&url.into()).await?;
StreamReader::new(http_stream)
};

let mut dest = tokio::fs::File::create(fname).await?;
let mut buf = [0; 8 * 1024];
let mut hasher = Sha256::new();
loop {
let num_bytes = http_async_reader.read(&mut buf).await?;
if num_bytes > 0 {
dest.write(&mut buf[0..num_bytes]).await?;
hasher.update(&buf[0..num_bytes]);
} else {
break;
#[cfg(feature="sha256sum")]
/// Initiate the download and return a result with the sha256sum of the download contents.
/// Specify an optional callback.
///
/// Arguments:
/// * `cb` - An optional callback for reporting information about the download asynchronously.
/// The callback takes the position of the current download, in bytes.
pub async fn download_and_return_sha256sum(&mut self, cb: &Option<Box<dyn Fn(u64) -> ()>>) -> Result<Vec<u8>, TDSTDError> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};

let fname = self.dst_path.join(self.fname.clone());
if fname.is_file() {
return Err(TDSTDError::new(TDSTDErrorKind::FileExists));
}

if self.dst_path.is_dir() {
let mut http_async_reader = StreamReader::new(self.response_stream.take().unwrap());

let mut dest = tokio::fs::File::create(fname).await?;
let mut buf = [0; 8 * 1024];
let mut num_bytes_total = 0;
let mut hasher = Sha256::new();
loop {
let num_bytes = http_async_reader.read(&mut buf).await?;
if let Some(ref cb) = cb {
num_bytes_total += num_bytes;
cb(num_bytes_total.try_into().unwrap());
}
if num_bytes > 0 {
dest.write(&mut buf[0..num_bytes]).await?;
hasher.update(&buf[0..num_bytes]);
} else {
break;
}
}
Ok(hasher.finalize().to_vec())
} else {
Err(TDSTDError::new(TDSTDErrorKind::DirectoryMissing))
}
Ok(hasher.finalize().to_vec())
} else {
Err(TDSTDError::new(TDSTDErrorKind::DirectoryMissing))
}
}

0 comments on commit f6016a9

Please sign in to comment.