Skip to content

Commit

Permalink
chore: release v0.15.1 (#155)
Browse files Browse the repository at this point in the history
* `stream-download`: 0.15.0 -> 0.15.1 (✓ API compatible changes)

<details><summary><i><b>Changelog</b></i></summary><p>

<blockquote>

[0.15.1](https://github.com/aschey/stream-download-rs/compare/0.15.0..0.15.1)
- 2025-02-15

- Format sorting for yt-dlp examples (#156) -
([806be91](806be91))

- *(deps)* Update ctor requirement from 0.2.7 to 0.3.3 (#154) -
([628b62d](628b62d))

<!-- generated by git-cliff -->
</blockquote>

</p></details>

---
This PR was generated with
[release-plz](https://github.com/release-plz/release-plz/).
  • Loading branch information
aschey committed Feb 23, 2025
1 parent 806be91 commit 5092957
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
merge_group:

env:
RUST_MIN: "1.75"
RUST_MIN: "1.80"

jobs:
test:
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

All notable changes to this project will be documented in this file.

## [0.15.1](https://github.com/aschey/stream-download-rs/compare/0.15.0..0.15.1) - 2025-02-15

### Bug Fixes

- Format sorting for yt-dlp examples (#156) - ([806be91](https://github.com/aschey/stream-download-rs/commit/806be913ee1502a500ea57a66b48688482d9b8c9))

### Dependencies

- *(deps)* Update ctor requirement from 0.2.7 to 0.3.3 (#154) - ([628b62d](https://github.com/aschey/stream-download-rs/commit/628b62df4494db3dd86f4cc896ee6a279621f4aa))

<!-- generated by git-cliff -->
## [0.15.0](https://github.com/aschey/stream-download-rs/compare/0.14.1..0.15.0) - 2025-02-15

### Features
Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
edition = "2021"
name = "stream-download"
version = "0.15.0"
rust-version = "1.75.0"
version = "0.15.1"
rust-version = "1.80.0"
authors = ["Austin Schey <[email protected]>"]
license = "MIT OR Apache-2.0"
readme = "README.md"
Expand All @@ -27,7 +27,6 @@ reqwest = { version = "0.12.2", features = [
"stream",
], default-features = false, optional = true }
reqwest-middleware = { version = ">=0.3,<0.5", optional = true }
tap = "1.0.1"
tempfile = { version = "3.1", optional = true }
thiserror = "2.0.1"
tokio = { version = "1.27", features = ["sync", "macros", "rt", "time"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,4 @@ for dynamically modifying each HTTP request.

## Supported Rust Versions

The MSRV is currently `1.75.0`.
The MSRV is currently `1.80.0`.
11 changes: 5 additions & 6 deletions src/http/reqwest_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Adapters for using [`reqwest`] with `stream-download`
use std::str::FromStr;
use std::sync::OnceLock;
use std::sync::LazyLock;

use bytes::Bytes;
use futures::Stream;
use reqwest::header::{self, AsHeaderName, HeaderMap};
use tap::TapFallible;
use tracing::warn;

use super::{DecodeError, RANGE_HEADER_KEY, format_range_header_bytes};
Expand All @@ -21,7 +20,7 @@ impl ResponseHeaders for HeaderMap {
fn get_header_str<K: AsHeaderName>(headers: &HeaderMap, key: K) -> Option<&str> {
headers.get(key).and_then(|val| {
val.to_str()
.tap_err(|e| warn!("error converting header value: {e:?}"))
.inspect_err(|e| warn!("error converting header value: {e:?}"))
.ok()
})
}
Expand Down Expand Up @@ -64,7 +63,7 @@ impl ClientResponse for reqwest::Response {
fn content_length(&self) -> Option<u64> {
get_header_str(self.headers(), header::CONTENT_LENGTH).and_then(|content_length| {
u64::from_str(content_length)
.tap_err(|e| warn!("invalid content length value: {e:?}"))
.inspect_err(|e| warn!("invalid content length value: {e:?}"))
.ok()
})
}
Expand Down Expand Up @@ -96,7 +95,7 @@ impl ClientResponse for reqwest::Response {
}

// per reqwest's docs, it's advisable to create a single client and reuse it
static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);

impl Client for reqwest::Client {
type Url = reqwest::Url;
Expand All @@ -105,7 +104,7 @@ impl Client for reqwest::Client {
type Headers = HeaderMap;

fn create() -> Self {
CLIENT.get_or_init(Self::new).clone()
CLIENT.clone()
}

async fn get(&self, url: &Self::Url) -> Result<Self::Response, Self::Error> {
Expand Down
17 changes: 5 additions & 12 deletions src/http/reqwest_middleware_client.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use parking_lot::Mutex;
use reqwest::header::HeaderMap;
use reqwest_middleware::Middleware;

use super::{Client, RANGE_HEADER_KEY, format_range_header_bytes};

static DEFAULT_MIDDLEWARE: OnceLock<Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>>> =
OnceLock::new();

fn get_middleware() -> &'static Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>> {
DEFAULT_MIDDLEWARE.get_or_init(|| Mutex::new([].into()))
}
static DEFAULT_MIDDLEWARE: LazyLock<Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>>> =
LazyLock::new(|| Mutex::new([].into()));

pub(crate) fn add_default_middleware<M>(middleware: M)
where
M: Middleware,
{
DEFAULT_MIDDLEWARE
.get_or_init(|| Mutex::new([].into()))
.lock()
.push(Arc::new(middleware));
DEFAULT_MIDDLEWARE.lock().push(Arc::new(middleware));
}

impl Client for reqwest_middleware::ClientWithMiddleware {
Expand All @@ -32,7 +25,7 @@ impl Client for reqwest_middleware::ClientWithMiddleware {
fn create() -> Self {
Self::new(
reqwest::Client::create(),
get_middleware().lock().clone().into_boxed_slice(),
DEFAULT_MIDDLEWARE.lock().clone().into_boxed_slice(),
)
}

Expand Down
10 changes: 4 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use settings::*;
use source::handle::SourceHandle;
use source::{DecodeError, Source, SourceStream};
use storage::StorageProvider;
use tap::Tap;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, instrument, trace};

Expand Down Expand Up @@ -449,8 +448,7 @@ impl<P: StorageProvider> StreamDownload<P> {
settings: Settings<S>,
) -> Result<Self, StreamInitializationError<S>>
where
S: SourceStream,
S::Error: Debug + Send,
S: SourceStream<Error: Debug + Send>,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<S, S::StreamCreationError>> + Send,
{
Expand Down Expand Up @@ -509,7 +507,7 @@ impl<P: StorageProvider> StreamDownload<P> {
}

fn handle_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let res = self.output_reader.read(buf).tap(|l| {
let res = self.output_reader.read(buf).inspect(|l| {
trace!(read_length = format!("{l:?}"), "returning read");
});
self.handle.notify_read();
Expand Down Expand Up @@ -633,7 +631,7 @@ impl<P: StorageProvider> Seek for StreamDownload<P> {
return self
.output_reader
.seek(SeekFrom::Start(absolute_seek_position))
.tap(|p| debug!(position = format!("{p:?}"), "returning seek position"));
.inspect_err(|p| debug!(position = format!("{p:?}"), "returning seek position"));
}

self.handle.request_position(absolute_seek_position);
Expand All @@ -648,7 +646,7 @@ impl<P: StorageProvider> Seek for StreamDownload<P> {

self.output_reader
.seek(SeekFrom::Start(absolute_seek_position))
.tap(|p| debug!(position = format!("{p:?}"), "returning seek position"))
.inspect_err(|p| debug!(position = format!("{p:?}"), "returning seek position"))
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use bytes::Bytes;
pub use command_builder::*;
pub use ffmpeg::*;
use futures::Stream;
use tap::TapFallible;
use tempfile::NamedTempFile;
use tracing::{debug, error, warn};
pub use yt_dlp::*;
Expand Down Expand Up @@ -209,25 +208,27 @@ impl ProcessStream {
for file in &mut self.stderr_files {
let _ = file
.flush()
.tap_err(|e| error!("error flushing file: {e:?}"));
.inspect_err(|e| error!("error flushing file: {e:?}"));
// Need to reopen the file to access the contents since it was written to from an
// external process
if let Ok(mut file_handle) = file
.reopen()
.tap_err(|e| error!("error opening file: {e:?}"))
.inspect_err(|e| error!("error opening file: {e:?}"))
{
let mut buf = String::new();
let _ = file_handle
.read_to_string(&mut buf)
.tap_err(|e| error!("error reading file: {e:?}"));
.inspect_err(|e| error!("error reading file: {e:?}"));
warn!("stderr from child process: {buf}");
}
}
}

fn close_stderr_files(&mut self) {
for file in mem::take(&mut self.stderr_files) {
let _ = file.close().tap_err(|e| warn!("error closing file: {e:?}"));
let _ = file
.close()
.inspect_err(|e| warn!("error closing file: {e:?}"));
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/source/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::Instant;

use parking_lot::{Condvar, Mutex, RwLock};
use rangemap::RangeSet;
use tap::TapFallible;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{Notify, mpsc};
use tracing::{debug, error};
Expand Down Expand Up @@ -47,7 +46,7 @@ impl SourceHandle {
pub(crate) fn seek(&self, position: u64) {
self.seek_tx
.try_send(position)
.tap_err(|e| {
.inspect_err(|e| {
if let TrySendError::Full(capacity) = e {
error!("Seek buffer full. Capacity: {capacity}");
}
Expand Down
8 changes: 4 additions & 4 deletions src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures::{Future, Stream, StreamExt, TryStream};
use handle::{
DownloadStatus, Downloaded, NotifyRead, PositionReached, RequestedPosition, SourceHandle,
};
use tap::TapFallible;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -133,9 +132,10 @@ pub(crate) struct Source<S: SourceStream, W: StorageWriter> {
cancellation_token: CancellationToken,
}

impl<S: SourceStream, W: StorageWriter> Source<S, W>
impl<S, W> Source<S, W>
where
S::Error: Debug,
S: SourceStream<Error: Debug>,
W: StorageWriter,
{
pub(crate) fn new(
writer: W,
Expand Down Expand Up @@ -255,7 +255,7 @@ where
// we'll cap the reconnect time to prevent additional delays between reconnect attempts.
let reconnect_pos = tokio::time::timeout(self.retry_timeout, stream.reconnect(pos)).await;
if reconnect_pos
.tap_err(|e| warn!("error attempting to reconnect: {e:?}"))
.inspect_err(|e| warn!("error attempting to reconnect: {e:?}"))
.is_ok()
{
if let Some(on_reconnect) = &mut self.on_reconnect {
Expand Down
Loading

0 comments on commit 5092957

Please sign in to comment.