Skip to content

Commit

Permalink
Merge pull request #81 from google/remove-has-length
Browse files Browse the repository at this point in the history
Remove HasLength trait.
  • Loading branch information
adetaylor authored Sep 28, 2024
2 parents 6318b14 + f362b7e commit fcce66b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 58 deletions.
56 changes: 16 additions & 40 deletions src/unzip/cloneable_seekable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,13 @@
// except according to those terms.

use std::{
fs::File,
io::{BufReader, Read, Seek, SeekFrom},
io::{Read, Seek, SeekFrom},
sync::{Arc, Mutex},
};

/// A trait to represent some reader which has a total length known in
/// advance. This is roughly equivalent to the nightly
/// [`Seek::stream_len`] API.
pub(crate) trait HasLength {
/// Return the current total length of this stream.
fn len(&self) -> u64;
}
use super::determine_stream_len;

struct Inner<R: Read + Seek + HasLength> {
struct Inner<R: Read + Seek> {
/// The underlying Read implementation.
r: R,
/// The position of r.
Expand All @@ -29,7 +22,7 @@ struct Inner<R: Read + Seek + HasLength> {
len: Option<u64>,
}

impl<R: Read + Seek + HasLength> Inner<R> {
impl<R: Read + Seek> Inner<R> {
fn new(r: R) -> Self {
Self {
r,
Expand All @@ -39,14 +32,15 @@ impl<R: Read + Seek + HasLength> Inner<R> {
}

/// Get the length of the data stream. This is assumed to be constant.
fn len(&mut self) -> u64 {
fn len(&mut self) -> std::io::Result<u64> {
// Return cached size
if let Some(len) = self.len {
return len;
return Ok(len);
}

let len = self.r.len();
let len = determine_stream_len(&mut self.r)?;
self.len = Some(len);
len
Ok(len)
}

/// Read into the given buffer, starting at the given offset in the data stream.
Expand All @@ -67,14 +61,14 @@ impl<R: Read + Seek + HasLength> Inner<R> {
/// and thus can be cloned cheaply. It supports seeking; each cloned instance
/// maintains its own pointer into the file, and the underlying instance
/// is seeked prior to each read.
pub(crate) struct CloneableSeekableReader<R: Read + Seek + HasLength> {
pub(crate) struct CloneableSeekableReader<R: Read + Seek> {
/// The wrapper around the Read implementation, shared between threads.
inner: Arc<Mutex<Inner<R>>>,
/// The position of _this_ reader.
pos: u64,
}

impl<R: Read + Seek + HasLength> Clone for CloneableSeekableReader<R> {
impl<R: Read + Seek> Clone for CloneableSeekableReader<R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
Expand All @@ -83,7 +77,7 @@ impl<R: Read + Seek + HasLength> Clone for CloneableSeekableReader<R> {
}
}

impl<R: Read + Seek + HasLength> CloneableSeekableReader<R> {
impl<R: Read + Seek> CloneableSeekableReader<R> {
/// Constructor. Takes ownership of the underlying `Read`.
/// You should pass in only streams whose total length you expect
/// to be fixed and unchanging. Odd behavior may occur if the length
Expand All @@ -97,7 +91,7 @@ impl<R: Read + Seek + HasLength> CloneableSeekableReader<R> {
}
}

impl<R: Read + Seek + HasLength> Read for CloneableSeekableReader<R> {
impl<R: Read + Seek> Read for CloneableSeekableReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut inner = self.inner.lock().unwrap();
let read_result = inner.read_at(self.pos, buf);
Expand All @@ -114,12 +108,12 @@ impl<R: Read + Seek + HasLength> Read for CloneableSeekableReader<R> {
}
}

impl<R: Read + Seek + HasLength> Seek for CloneableSeekableReader<R> {
impl<R: Read + Seek> Seek for CloneableSeekableReader<R> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let new_pos = match pos {
SeekFrom::Start(pos) => pos,
SeekFrom::End(offset_from_end) => {
let file_len = self.inner.lock().unwrap().len();
let file_len = self.inner.lock().unwrap().len()?;
if -offset_from_end as u64 > file_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand All @@ -146,30 +140,12 @@ impl<R: Read + Seek + HasLength> Seek for CloneableSeekableReader<R> {
}
}

impl<R: HasLength> HasLength for BufReader<R> {
fn len(&self) -> u64 {
self.get_ref().len()
}
}

impl HasLength for File {
fn len(&self) -> u64 {
self.metadata().unwrap().len()
}
}

#[cfg(test)]
mod test {
use super::{CloneableSeekableReader, HasLength};
use super::CloneableSeekableReader;
use std::io::{Cursor, Read, Seek, SeekFrom};
use test_log::test;

impl HasLength for Cursor<Vec<u8>> {
fn len(&self) -> u64 {
self.get_ref().len() as u64
}
}

#[test]
fn test_cloneable_seekable_reader() -> std::io::Result<()> {
let buf: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
Expand Down
22 changes: 14 additions & 8 deletions src/unzip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod seekable_http_reader;
use std::{
borrow::Cow,
fs::File,
io::{ErrorKind, Read, Seek},
io::{ErrorKind, Read, Seek, SeekFrom},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
Expand All @@ -27,10 +27,16 @@ use crate::unzip::{
cloneable_seekable_reader::CloneableSeekableReader, progress_updater::ProgressUpdater,
};

use self::{
cloneable_seekable_reader::HasLength,
seekable_http_reader::{AccessPattern, SeekableHttpReader, SeekableHttpReaderEngine},
};
use self::seekable_http_reader::{AccessPattern, SeekableHttpReader, SeekableHttpReaderEngine};

pub(crate) fn determine_stream_len<R: Seek>(stream: &mut R) -> std::io::Result<u64> {
let old_pos = stream.stream_position()?;
let len = stream.seek(SeekFrom::End(0))?;
if old_pos != len {
stream.seek(SeekFrom::Start(old_pos))?;
}
Ok(len)
}

/// Options for unzipping.
pub struct UnzipOptions<'a, 'b> {
Expand Down Expand Up @@ -159,11 +165,11 @@ impl<F: Fn()> UnzipEngineImpl for UnzipUriEngine<F> {

impl UnzipEngine {
/// Create an unzip engine which knows how to unzip a file.
pub fn for_file(zipfile: File) -> Result<Self> {
pub fn for_file(mut zipfile: File) -> Result<Self> {
// The following line doesn't actually seem to make any significant
// performance difference.
// let zipfile = BufReader::new(zipfile);
let compressed_length = zipfile.len();
let compressed_length = determine_stream_len(&mut zipfile)?;
let zipfile = CloneableSeekableReader::new(zipfile);
Ok(Self {
zipfile: Box::new(UnzipFileEngine(ZipArchive::new(zipfile)?)),
Expand Down Expand Up @@ -209,7 +215,7 @@ impl UnzipEngine {
let mut response = reqwest::blocking::get(uri)?;
let mut tempfile = tempfile::tempfile()?;
std::io::copy(&mut response, &mut tempfile)?;
let compressed_length = tempfile.len();
let compressed_length = determine_stream_len(&mut tempfile)?;
let zipfile = CloneableSeekableReader::new(tempfile);
(
compressed_length,
Expand Down
11 changes: 1 addition & 10 deletions src/unzip/seekable_http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use ranges::Ranges;
use reqwest::blocking::Response;
use thiserror::Error;

use super::{
cloneable_seekable_reader::HasLength,
http_range_reader::{self, RangeFetcher},
};
use super::http_range_reader::{self, RangeFetcher};

/// This is how much we read from the underlying HTTP stream in a given thread,
/// before signalling other threads that they may wish to continue with their
Expand Down Expand Up @@ -636,12 +633,6 @@ impl Read for SeekableHttpReader {
}
}

impl HasLength for SeekableHttpReader {
fn len(&self) -> u64 {
self.engine.len()
}
}

#[cfg(test)]
mod tests {
use ripunzip_test_utils::{ExpectedRange, RangeAwareResponse, RangeAwareResponseType};
Expand Down

0 comments on commit fcce66b

Please sign in to comment.