Skip to content

Commit

Permalink
Prototype StreamingEntryReader
Browse files Browse the repository at this point in the history
  • Loading branch information
fasterthanlime committed Feb 2, 2024
1 parent 183f980 commit f0d34fe
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rc-zip-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ positioned-io = { version = "0.3.3", optional = true }
rc-zip = { version = "3.0.0", path = "../rc-zip" }
oval = "2.0.0"
tracing = "0.1.40"
winnow = "0.5.36"

[features]
default = ["file", "deflate"]
Expand Down
1 change: 1 addition & 0 deletions rc-zip-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

mod entry_reader;
mod read_zip;
mod streaming_entry_reader;

// re-exports
pub use rc_zip;
Expand Down
49 changes: 48 additions & 1 deletion rc-zip-sync/src/read_zip.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use rc_zip::{
error::Error,
fsm::{ArchiveFsm, FsmResult},
parse::{Archive, StoredEntry},
parse::{Archive, LocalFileHeaderRecord, StoredEntry},
};
use winnow::{
error::ErrMode,
stream::{AsBytes, Offset},
Parser, Partial,
};

use crate::entry_reader::EntryReader;
use crate::streaming_entry_reader::StreamingEntryReader;
use std::{io::Read, ops::Deref};

/// A trait for reading something as a zip archive
Expand Down Expand Up @@ -215,3 +221,44 @@ impl ReadZip for std::fs::File {
self.read_zip_with_size(size)
}
}

pub trait ReadZipEntriesStreaming<R>
where
R: Read,
{
fn first_entry(self) -> Result<StreamingEntryReader<R>, Error>;
}

impl<R> ReadZipEntriesStreaming<R> for R
where
R: Read,
{
fn first_entry(mut self) -> Result<StreamingEntryReader<Self>, Error> {
// first, get enough data to read the first local file header
let mut buf = oval::Buffer::with_capacity(16 * 1024);

Check warning on line 238 in rc-zip-sync/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/read_zip.rs#L236-L238

Added lines #L236 - L238 were not covered by tests

let header = loop {
let n = self.read(buf.space())?;
buf.fill(n);

let mut input = Partial::new(buf.data());
match LocalFileHeaderRecord::parser.parse_next(&mut input) {
Ok(header) => {
let consumed = input.as_bytes().offset_from(&buf.data());
buf.consume(consumed);
tracing::trace!(?header, %consumed, "Got local file header record!");
break header;

Check warning on line 250 in rc-zip-sync/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/read_zip.rs#L240-L250

Added lines #L240 - L250 were not covered by tests
}
// TODO: keep reading if we don't have enough data
Err(ErrMode::Incomplete(_)) => {
// read more
}
Err(e) => {
panic!("{e}")

Check warning on line 257 in rc-zip-sync/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/read_zip.rs#L253-L257

Added lines #L253 - L257 were not covered by tests
}
}
};

Ok(StreamingEntryReader::new(buf, header, self))
}

Check warning on line 263 in rc-zip-sync/src/read_zip.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/read_zip.rs#L262-L263

Added lines #L262 - L263 were not covered by tests
}
132 changes: 132 additions & 0 deletions rc-zip-sync/src/streaming_entry_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use oval::Buffer;
use rc_zip::{
fsm::{EntryFsm, FsmResult},
parse::LocalFileHeaderRecord,
};
use std::{
io::{self, Write},
str::Utf8Error,
};

pub(crate) struct StreamingEntryReader<R> {
header: LocalFileHeaderRecord,
rd: R,
state: State,
}

#[derive(Default)]

Check warning on line 17 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L17

Added line #L17 was not covered by tests
enum State {
Reading {
remain: Buffer,
fsm: EntryFsm,
},
Finished {
/// remaining buffer for next entry
remain: Buffer,
},
#[default]
Transition,
}

impl<R> StreamingEntryReader<R>
where
R: io::Read,
{
pub(crate) fn new(remain: Buffer, header: LocalFileHeaderRecord, rd: R) -> Self {
Self {
rd,
header,
state: State::Reading {
remain,
fsm: EntryFsm::new(None),
},
}
}

Check warning on line 44 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L35-L44

Added lines #L35 - L44 were not covered by tests
}

impl<R> io::Read for StreamingEntryReader<R>
where
R: io::Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match std::mem::take(&mut self.state) {

Check warning on line 52 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
State::Reading {
mut remain,
mut fsm,
} => {
if fsm.wants_read() {
tracing::trace!("fsm wants read");
if remain.available_data() > 0 {
let n = remain.read(buf)?;
tracing::trace!("giving fsm {} bytes from remain", n);
fsm.fill(n);

Check warning on line 62 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L54-L62

Added lines #L54 - L62 were not covered by tests
} else {
let n = self.rd.read(fsm.space())?;
tracing::trace!("giving fsm {} bytes from rd", n);
fsm.fill(n);

Check warning on line 66 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L64-L66

Added lines #L64 - L66 were not covered by tests
}
} else {
tracing::trace!("fsm does not want read");

Check warning on line 69 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L69

Added line #L69 was not covered by tests
}

match fsm.process(buf)? {
FsmResult::Continue((fsm, outcome)) => {
self.state = State::Reading { remain, fsm };

if outcome.bytes_written > 0 {
Ok(outcome.bytes_written)

Check warning on line 77 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L72-L77

Added lines #L72 - L77 were not covered by tests
} else {
// loop, it happens
self.read(buf)

Check warning on line 80 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L80

Added line #L80 was not covered by tests
}
}
FsmResult::Done(mut fsm_remain) => {
// if our remain still has remaining data, it goes after
// what the fsm just gave back
if remain.available_data() > 0 {
fsm_remain.grow(fsm_remain.capacity() + remain.available_data());
fsm_remain.write_all(remain.data());
drop(remain)
}

Check warning on line 90 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L83-L90

Added lines #L83 - L90 were not covered by tests

// FIXME: read the next local file header here

self.state = State::Finished { remain: fsm_remain };

// neat!
Ok(0)

Check warning on line 97 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L94-L97

Added lines #L94 - L97 were not covered by tests
}
}
}
State::Finished { remain } => {
// wait for them to call finished
self.state = State::Finished { remain };
Ok(0)

Check warning on line 104 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L101-L104

Added lines #L101 - L104 were not covered by tests
}
State::Transition => unreachable!(),

Check warning on line 106 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L106

Added line #L106 was not covered by tests
}
}

Check warning on line 108 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L108

Added line #L108 was not covered by tests
}

impl<R> StreamingEntryReader<R> {
/// Return the name of this entry, decoded as UTF-8.
///
/// There is no support for CP-437 in the streaming interface
pub fn name(&self) -> Result<&str, Utf8Error> {
std::str::from_utf8(&self.header.name.0)
}

Check warning on line 117 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L115-L117

Added lines #L115 - L117 were not covered by tests

/// Finish reading this entry, returning the next streaming entry reader, if
/// any. This panics if the entry is not fully read.
pub fn finish(self) -> Option<StreamingEntryReader<R>> {
match self.state {

Check warning on line 122 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L121-L122

Added lines #L121 - L122 were not covered by tests
State::Reading { .. } => {
panic!("finish called before entry is fully read")

Check warning on line 124 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L124

Added line #L124 was not covered by tests
}
State::Finished { .. } => {
todo!("read local file header for next entry")

Check warning on line 127 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L127

Added line #L127 was not covered by tests
}
State::Transition => unreachable!(),

Check warning on line 129 in rc-zip-sync/src/streaming_entry_reader.rs

View check run for this annotation

Codecov / codecov/patch

rc-zip-sync/src/streaming_entry_reader.rs#L129

Added line #L129 was not covered by tests
}
}
}

0 comments on commit f0d34fe

Please sign in to comment.