diff --git a/rc-zip-sync/src/entry_reader.rs b/rc-zip-sync/src/entry_reader.rs index 89b25c5..82eca3a 100644 --- a/rc-zip-sync/src/entry_reader.rs +++ b/rc-zip-sync/src/entry_reader.rs @@ -30,38 +30,45 @@ where R: io::Read, { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut fsm = match self.fsm.take() { - Some(fsm) => fsm, - None => return Ok(0), - }; + loop { + let mut fsm = match self.fsm.take() { + Some(fsm) => fsm, + None => return Ok(0), + }; - if fsm.wants_read() { - trace!("fsm wants read"); - let n = self.rd.read(fsm.space())?; - trace!("giving fsm {} bytes", n); - fsm.fill(n); - } else { - trace!("fsm does not want read"); - } + #[allow(clippy::needless_late_init)] // don't tell me what to do + let filled_bytes; + if fsm.wants_read() { + tracing::trace!(space_avail = fsm.space().len(), "fsm wants read"); + let n = self.rd.read(fsm.space())?; + fsm.fill(n); + filled_bytes = n; + } else { + trace!("fsm does not want read"); + filled_bytes = 0; + } - match fsm.process(buf)? { - FsmResult::Continue((fsm, outcome)) => { - self.fsm = Some(fsm); + match fsm.process(buf)? { + FsmResult::Continue((fsm, outcome)) => { + self.fsm = Some(fsm); - if outcome.bytes_written > 0 { - Ok(outcome.bytes_written) - } else if outcome.bytes_read == 0 { - // that's EOF, baby! - Ok(0) - } else { - // loop, it happens - self.read(buf) + if outcome.bytes_written > 0 { + tracing::trace!("wrote {} bytes", outcome.bytes_written); + return Ok(outcome.bytes_written); + } else if filled_bytes > 0 || outcome.bytes_read > 0 { + // progress was made, keep reading + continue; + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "entry reader: no progress", + )); + } + } + FsmResult::Done(_) => { + // neat! + return Ok(0); } - } - FsmResult::Done(_) => { - // neat! - trace!("fsm done"); - Ok(0) } } } diff --git a/rc-zip-tokio/src/entry_reader.rs b/rc-zip-tokio/src/entry_reader.rs index 094da2a..5839d97 100644 --- a/rc-zip-tokio/src/entry_reader.rs +++ b/rc-zip-tokio/src/entry_reader.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, task}; +use std::{io, pin::Pin, task}; use pin_project_lite::pin_project; use rc_zip::{ @@ -42,48 +42,57 @@ where cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> task::Poll> { - let this = self.as_mut().project(); + let mut this = self.as_mut().project(); - let mut fsm = match this.fsm.take() { - Some(fsm) => fsm, - None => return Ok(()).into(), - }; + loop { + let mut fsm = match this.fsm.take() { + Some(fsm) => fsm, + None => return Ok(()).into(), + }; - if fsm.wants_read() { - tracing::trace!(space_avail = fsm.space().len(), "fsm wants read"); - let mut buf = ReadBuf::new(fsm.space()); - match this.rd.poll_read(cx, &mut buf) { - task::Poll::Ready(res) => res?, - task::Poll::Pending => { - *this.fsm = Some(fsm); - return task::Poll::Pending; + let filled_bytes; + if fsm.wants_read() { + tracing::trace!(space_avail = fsm.space().len(), "fsm wants read"); + let mut buf = ReadBuf::new(fsm.space()); + match this.rd.as_mut().poll_read(cx, &mut buf) { + task::Poll::Ready(res) => res?, + task::Poll::Pending => { + *this.fsm = Some(fsm); + return task::Poll::Pending; + } } - } - let n = buf.filled().len(); + let n = buf.filled().len(); - tracing::trace!("read {} bytes", n); - fsm.fill(n); - } else { - tracing::trace!("fsm does not want read"); - } + tracing::trace!("read {} bytes", n); + fsm.fill(n); + filled_bytes = n; + } else { + tracing::trace!("fsm does not want read"); + filled_bytes = 0; + } - match fsm.process(buf.initialize_unfilled())? { - FsmResult::Continue((fsm, outcome)) => { - *this.fsm = Some(fsm); - if outcome.bytes_written > 0 { - tracing::trace!("wrote {} bytes", outcome.bytes_written); - buf.advance(outcome.bytes_written); - } else if outcome.bytes_read == 0 { - // that's EOF, baby! - } else { - // loop, it happens - return self.poll_read(cx, buf); + match fsm.process(buf.initialize_unfilled())? { + FsmResult::Continue((fsm, outcome)) => { + *this.fsm = Some(fsm); + if outcome.bytes_written > 0 { + tracing::trace!("wrote {} bytes", outcome.bytes_written); + buf.advance(outcome.bytes_written); + } else if filled_bytes > 0 || outcome.bytes_read > 0 { + // progress was made, keep reading + continue; + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "entry reader: no progress", + )) + .into(); + } + } + FsmResult::Done(_) => { + // neat! } } - FsmResult::Done(_) => { - // neat! - } + return Ok(()).into(); } - Ok(()).into() } } diff --git a/rc-zip-tokio/src/read_zip.rs b/rc-zip-tokio/src/read_zip.rs index 95864fc..7794e9f 100644 --- a/rc-zip-tokio/src/read_zip.rs +++ b/rc-zip-tokio/src/read_zip.rs @@ -61,13 +61,10 @@ where let mut fsm = ArchiveFsm::new(size); loop { if let Some(offset) = fsm.wants_read() { - trace!(%offset, "read_zip_with_size: wants_read, space len = {}", fsm.space().len()); - let mut cstate_next = match cstate.take() { Some(cstate) => { if cstate.offset == offset { // all good, re-using - trace!(%offset, "read_zip_with_size: re-using cursor"); cstate } else { trace!(%offset, %cstate.offset, "read_zip_with_size: making new cursor (had wrong offset)"); @@ -91,7 +88,7 @@ where cstate_next.offset += read_bytes as u64; cstate = Some(cstate_next); - trace!(%read_bytes, "read_zip_with_size: read"); + trace!(%read_bytes, "filling fsm"); if read_bytes == 0 { return Err(Error::IO(io::ErrorKind::UnexpectedEof.into())); } @@ -308,23 +305,18 @@ impl AsyncRead for AsyncRandomAccessFileCursor { match &mut self.state { ARAFCState::Idle(core) => { if core.inner_buf_offset < core.inner_buf_len { - trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, avail = %(core.inner_buf_len - core.inner_buf_offset), "poll_read: have data in inner buffer"); - // we have data in the inner buffer, don't even need // to spawn a blocking task let read_len = cmp::min(buf.remaining(), core.inner_buf_len - core.inner_buf_offset); - trace!(%read_len, "poll_read: putting slice"); buf.put_slice(&core.inner_buf[core.inner_buf_offset..][..read_len]); core.inner_buf_offset += read_len; - trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "poll_read: after put_slice"); + trace!(inner_buf_offset = %core.inner_buf_offset, inner_buf_len = %core.inner_buf_len, "read from inner buffer"); return Poll::Ready(Ok(())); } - trace!("will have to issue a read call"); - // this is just used to shadow core #[allow(unused_variables, clippy::let_unit_value)] let core = (); @@ -339,7 +331,7 @@ impl AsyncRead for AsyncRandomAccessFileCursor { let fut = Box::pin(tokio::task::spawn_blocking(move || { let read_bytes = file.read_at(file_offset, &mut inner_buf)?; - trace!("read {} bytes", read_bytes); + trace!(%read_bytes, "read from file"); Ok(ARAFCCore { file_offset: file_offset + read_bytes as u64, file, diff --git a/rc-zip/src/fsm/entry/lzma_dec.rs b/rc-zip/src/fsm/entry/lzma_dec.rs index 346e041..71db691 100644 --- a/rc-zip/src/fsm/entry/lzma_dec.rs +++ b/rc-zip/src/fsm/entry/lzma_dec.rs @@ -40,75 +40,92 @@ impl LzmaDec { impl Decompressor for LzmaDec { fn decompress( &mut self, - in_buf: &[u8], + mut in_buf: &[u8], out: &mut [u8], has_more_input: HasMoreInput, ) -> Result { - tracing::trace!( - in_buf_len = in_buf.len(), - out_len = out.len(), - remain_in_internal_buffer = self.internal_buf_mut().len(), - "decompress", - ); - let mut outcome: DecompressOutcome = Default::default(); - self.copy_to_out(out, &mut outcome); - if outcome.bytes_written > 0 { - trace!( - "still draining internal buffer, just copied {} bytes", - outcome.bytes_written + loop { + tracing::trace!( + in_buf_len = in_buf.len(), + out_len = out.len(), + remain_in_internal_buffer = self.internal_buf_mut().len(), + ?outcome, + "decompress", ); - return Ok(outcome); - } - match &mut self.state { - State::Writing(stream) => { - let n = stream.write(in_buf).map_err(dec_err)?; + self.copy_to_out(out, &mut outcome); + if outcome.bytes_written > 0 { trace!( - "wrote {} bytes to decompressor (of {} available)", - n, - in_buf.len() + "still draining internal buffer, just copied {} bytes", + outcome.bytes_written ); - outcome.bytes_read = n; - - // if we haven't written all the input, and we haven't gotten - // any output, then we need to keep going - if n != 0 && n < in_buf.len() && self.internal_buf_mut().is_empty() { - // note: the n != 0 here is because apparently there can be a 10-byte - // trailer after LZMA compressed data? and the decoder will _refuse_ - // to let us write them, so when we have just these 10 bytes left, - // it's good to just let the decoder finish up. - trace!("didn't write all output AND no output yet, so keep going"); - return self.decompress(&in_buf[n..], out, has_more_input); - } + return Ok(outcome); + } - match has_more_input { - HasMoreInput::Yes => { - // keep going - trace!("more input to come"); + match &mut self.state { + State::Writing(stream) => { + let n = stream.write(in_buf).map_err(dec_err)?; + trace!( + "wrote {} bytes to decompressor (of {} available)", + n, + in_buf.len() + ); + outcome.bytes_read += n; + in_buf = &in_buf[n..]; + + // if we wrote some of the input, and we haven't gotten any + // output, then we need to loop + if n > 0 && n < in_buf.len() && self.internal_buf_mut().is_empty() { + trace!("fed _some_ to the decoder and no output yet, keep going"); + continue; } - HasMoreInput::No => { - trace!("no more input to come"); - match std::mem::take(&mut self.state) { - State::Writing(stream) => { - trace!("finishing..."); - self.state = State::Draining(stream.finish().map_err(dec_err)?); + + match has_more_input { + HasMoreInput::Yes => { + trace!("more input to come"); + } + HasMoreInput::No => { + trace!("no more input to come"); + + // this happens when we hit the 10-byte trailer mentioned above + // in this case, we just pretend we wrote everything + match in_buf.len() { + 0 => { + // trailer is not present, that's okay + } + 10 => { + trace!("eating LZMA trailer"); + outcome.bytes_read += 10; + } + _ => { + return Err(Error::Decompression { method: Method::Lzma, msg: format!("expected LZMA trailer or no LZMA trailer, but not a {}-byte trailer", in_buf.len()) }); + } + } + + match std::mem::take(&mut self.state) { + State::Writing(stream) => { + trace!("finishing..."); + self.state = State::Draining(stream.finish().map_err(dec_err)?); + continue; + } + _ => unreachable!(), } - _ => unreachable!(), } } } + State::Draining(_) => { + // keep going + trace!("draining"); + } + State::Transition => unreachable!(), } - State::Draining(_) => { - // keep going - } - State::Transition => unreachable!(), - } - self.copy_to_out(out, &mut outcome); - trace!("decompressor gave us {} bytes", outcome.bytes_written); - Ok(outcome) + self.copy_to_out(out, &mut outcome); + trace!("decompressor gave us {} bytes", outcome.bytes_written); + return Ok(outcome); + } } } diff --git a/rc-zip/src/fsm/entry/mod.rs b/rc-zip/src/fsm/entry/mod.rs index 104b8f1..9fc8a03 100644 --- a/rc-zip/src/fsm/entry/mod.rs +++ b/rc-zip/src/fsm/entry/mod.rs @@ -252,14 +252,15 @@ impl EntryFsm { ); let outcome = decompressor.decompress(in_buf, out, has_more_input)?; + self.buffer.consume(outcome.bytes_read); + *compressed_bytes += outcome.bytes_read as u64; trace!( - ?outcome, compressed_bytes = *compressed_bytes, uncompressed_bytes = *uncompressed_bytes, + entry_compressed_size = %entry.compressed_size, + ?outcome, "decompressed" ); - self.buffer.consume(outcome.bytes_read); - *compressed_bytes += outcome.bytes_read as u64; if outcome.bytes_written == 0 && *compressed_bytes == entry.compressed_size { trace!("eof and no bytes written, we're done"); @@ -280,6 +281,8 @@ impl EntryFsm { } }); return self.process(out); + } else if outcome.bytes_written == 0 && outcome.bytes_read == 0 { + panic!("decompressor didn't read anything and didn't write anything?") } // write the decompressed data to the hasher diff --git a/rc-zip/src/fsm/entry/zstd_dec.rs b/rc-zip/src/fsm/entry/zstd_dec.rs index 276fefc..92215d3 100644 --- a/rc-zip/src/fsm/entry/zstd_dec.rs +++ b/rc-zip/src/fsm/entry/zstd_dec.rs @@ -31,76 +31,94 @@ impl ZstdDec { impl Decompressor for ZstdDec { fn decompress( &mut self, - in_buf: &[u8], + mut in_buf: &[u8], out: &mut [u8], has_more_input: HasMoreInput, ) -> Result { - tracing::trace!( - in_buf_len = in_buf.len(), - out_len = out.len(), - remain_in_internal_buffer = self.internal_buf_mut().len(), - "decompress", - ); - let mut outcome: DecompressOutcome = Default::default(); - self.copy_to_out(out, &mut outcome); - if outcome.bytes_written > 0 { - trace!( - "still draining internal buffer, just copied {} bytes", - outcome.bytes_written + loop { + tracing::trace!( + in_buf_len = in_buf.len(), + out_len = out.len(), + remain_in_internal_buffer = self.internal_buf_mut().len(), + ?outcome, + "decompress", ); - return Ok(outcome); - } - match &mut self.state { - State::Writing(stream) => { - let n = stream.write(in_buf).map_err(dec_err)?; + self.copy_to_out(out, &mut outcome); + if outcome.bytes_written > 0 { trace!( - "wrote {} bytes to decompressor (of {} available)", - n, - in_buf.len() + "still draining internal buffer, just copied {} bytes", + outcome.bytes_written ); - outcome.bytes_read = n; - - // if we haven't written all the input, and we haven't gotten - // any output, then we need to keep going - if n != 0 && n < in_buf.len() && self.internal_buf_mut().is_empty() { - // note: the n != 0 here is because apparently there can be a 10-byte - // trailer after LZMA compressed data? and the decoder will _refuse_ - // to let us write them, so when we have just these 10 bytes left, - // it's good to just let the decoder finish up. - trace!("didn't write all output AND no output yet, so keep going"); - return self.decompress(&in_buf[n..], out, has_more_input); - } + return Ok(outcome); + } - match has_more_input { - HasMoreInput::Yes => { - // keep going - trace!("more input to come"); + match &mut self.state { + State::Writing(stream) => { + let n = stream.write(in_buf).map_err(dec_err)?; + trace!( + "wrote {} bytes to decompressor (of {} available)", + n, + in_buf.len() + ); + outcome.bytes_read += n; + in_buf = &in_buf[n..]; + + // if we wrote some of the input, and we haven't gotten any + // output, then we need to loop + if n > 0 && n < in_buf.len() && self.internal_buf_mut().is_empty() { + trace!("fed _some_ to the decoder and no output yet, keep going"); + continue; } - HasMoreInput::No => { - trace!("no more input to come"); - match std::mem::take(&mut self.state) { - State::Writing(mut stream) => { - trace!("finishing..."); - stream.flush().map_err(dec_err)?; - self.state = State::Draining(stream.into_inner()); + + match has_more_input { + HasMoreInput::Yes => { + trace!("more input to come"); + } + HasMoreInput::No => { + trace!("no more input to come"); + + match in_buf.len() { + 0 => { + // no trailer, good + } + 1 => { + // TODO: figure out a good explanation for this. + // in some test files the compressed size is 37 bytes but + // the zstd decompressor will only accept 36 bytes. + trace!("eating ZSTD trailer?"); + outcome.bytes_read += 1; + } + _ => { + return Err(Error::Decompression { method: Method::Zstd, msg: format!("expected ZSTD trailer or no ZSTD trailer, but not a {}-byte trailer", in_buf.len()) }); + } + } + + match std::mem::take(&mut self.state) { + State::Writing(mut stream) => { + trace!("finishing..."); + stream.flush().map_err(dec_err)?; + self.state = State::Draining(stream.into_inner()); + continue; + } + _ => unreachable!(), } - _ => unreachable!(), } } } + State::Draining(_) => { + // keep going + trace!("draining"); + } + State::Transition => unreachable!(), } - State::Draining(_) => { - // keep going - } - State::Transition => unreachable!(), - } - self.copy_to_out(out, &mut outcome); - trace!("decompressor gave us {} bytes", outcome.bytes_written); - Ok(outcome) + self.copy_to_out(out, &mut outcome); + trace!("decompressor gave us {} bytes", outcome.bytes_written); + return Ok(outcome); + } } } diff --git a/rc-zip/src/parse/extra_field.rs b/rc-zip/src/parse/extra_field.rs index 9559c07..ce35d63 100644 --- a/rc-zip/src/parse/extra_field.rs +++ b/rc-zip/src/parse/extra_field.rs @@ -1,7 +1,6 @@ use std::borrow::Cow; use ownable::{IntoOwned, ToOwned}; -use tracing::trace; use winnow::{ binary::{le_u16, le_u32, le_u64, le_u8, length_take}, combinator::{opt, preceded, repeat_till}, @@ -88,11 +87,6 @@ impl<'a> ExtraField<'a> { move |i| { use ExtraField as EF; let rec = ExtraFieldRecord::parser.parse_next(i)?; - trace!( - "parsing extra field record, tag {:04x}, len {}", - rec.tag, - rec.payload.len() - ); let payload = &mut Partial::new(rec.payload); let variant = match rec.tag { @@ -322,7 +316,6 @@ pub enum NtfsAttr { impl NtfsAttr { fn parser(i: &mut Partial<&'_ [u8]>) -> PResult { let tag = le_u16.parse_next(i)?; - trace!("parsing NTFS attribute, tag {:04x}", tag); let payload = length_take(le_u16).parse_next(i)?; match tag { @@ -349,7 +342,6 @@ pub struct NtfsAttr1 { impl NtfsAttr1 { fn parser(i: &mut Partial<&'_ [u8]>) -> PResult { - trace!("parsing NTFS attr 1, input len is {}", i.len()); seq! {Self { mtime: NtfsTimestamp::parser, atime: NtfsTimestamp::parser,