Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: use Rust 1.80 features #157

Merged
merged 1 commit into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
merge_group:

env:
RUST_MIN: "1.75"
RUST_MIN: "1.80"

jobs:
test:
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
edition = "2021"
name = "stream-download"
version = "0.15.1"
rust-version = "1.75.0"
rust-version = "1.80.0"
authors = ["Austin Schey <[email protected]>"]
license = "MIT OR Apache-2.0"
readme = "README.md"
Expand All @@ -27,7 +27,6 @@ reqwest = { version = "0.12.2", features = [
"stream",
], default-features = false, optional = true }
reqwest-middleware = { version = ">=0.3,<0.5", optional = true }
tap = "1.0.1"
tempfile = { version = "3.1", optional = true }
thiserror = "2.0.1"
tokio = { version = "1.27", features = ["sync", "macros", "rt", "time"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,4 @@ for dynamically modifying each HTTP request.

## Supported Rust Versions

The MSRV is currently `1.75.0`.
The MSRV is currently `1.80.0`.
11 changes: 5 additions & 6 deletions src/http/reqwest_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Adapters for using [`reqwest`] with `stream-download`

use std::str::FromStr;
use std::sync::OnceLock;
use std::sync::LazyLock;

use bytes::Bytes;
use futures::Stream;
use reqwest::header::{self, AsHeaderName, HeaderMap};
use tap::TapFallible;
use tracing::warn;

use super::{DecodeError, RANGE_HEADER_KEY, format_range_header_bytes};
Expand All @@ -21,7 +20,7 @@ impl ResponseHeaders for HeaderMap {
fn get_header_str<K: AsHeaderName>(headers: &HeaderMap, key: K) -> Option<&str> {
headers.get(key).and_then(|val| {
val.to_str()
.tap_err(|e| warn!("error converting header value: {e:?}"))
.inspect_err(|e| warn!("error converting header value: {e:?}"))
.ok()
})
}
Expand Down Expand Up @@ -64,7 +63,7 @@ impl ClientResponse for reqwest::Response {
fn content_length(&self) -> Option<u64> {
get_header_str(self.headers(), header::CONTENT_LENGTH).and_then(|content_length| {
u64::from_str(content_length)
.tap_err(|e| warn!("invalid content length value: {e:?}"))
.inspect_err(|e| warn!("invalid content length value: {e:?}"))
.ok()
})
}
Expand Down Expand Up @@ -96,7 +95,7 @@ impl ClientResponse for reqwest::Response {
}

// per reqwest's docs, it's advisable to create a single client and reuse it
static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);

impl Client for reqwest::Client {
type Url = reqwest::Url;
Expand All @@ -105,7 +104,7 @@ impl Client for reqwest::Client {
type Headers = HeaderMap;

fn create() -> Self {
CLIENT.get_or_init(Self::new).clone()
CLIENT.clone()
}

async fn get(&self, url: &Self::Url) -> Result<Self::Response, Self::Error> {
Expand Down
17 changes: 5 additions & 12 deletions src/http/reqwest_middleware_client.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use parking_lot::Mutex;
use reqwest::header::HeaderMap;
use reqwest_middleware::Middleware;

use super::{Client, RANGE_HEADER_KEY, format_range_header_bytes};

static DEFAULT_MIDDLEWARE: OnceLock<Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>>> =
OnceLock::new();

fn get_middleware() -> &'static Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>> {
DEFAULT_MIDDLEWARE.get_or_init(|| Mutex::new([].into()))
}
static DEFAULT_MIDDLEWARE: LazyLock<Mutex<Vec<Arc<dyn reqwest_middleware::Middleware>>>> =
LazyLock::new(|| Mutex::new([].into()));

pub(crate) fn add_default_middleware<M>(middleware: M)
where
M: Middleware,
{
DEFAULT_MIDDLEWARE
.get_or_init(|| Mutex::new([].into()))
.lock()
.push(Arc::new(middleware));
DEFAULT_MIDDLEWARE.lock().push(Arc::new(middleware));
}

impl Client for reqwest_middleware::ClientWithMiddleware {
Expand All @@ -32,7 +25,7 @@ impl Client for reqwest_middleware::ClientWithMiddleware {
fn create() -> Self {
Self::new(
reqwest::Client::create(),
get_middleware().lock().clone().into_boxed_slice(),
DEFAULT_MIDDLEWARE.lock().clone().into_boxed_slice(),
)
}

Expand Down
10 changes: 4 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use settings::*;
use source::handle::SourceHandle;
use source::{DecodeError, Source, SourceStream};
use storage::StorageProvider;
use tap::Tap;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, instrument, trace};

Expand Down Expand Up @@ -449,8 +448,7 @@ impl<P: StorageProvider> StreamDownload<P> {
settings: Settings<S>,
) -> Result<Self, StreamInitializationError<S>>
where
S: SourceStream,
S::Error: Debug + Send,
S: SourceStream<Error: Debug + Send>,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<S, S::StreamCreationError>> + Send,
{
Expand Down Expand Up @@ -509,7 +507,7 @@ impl<P: StorageProvider> StreamDownload<P> {
}

fn handle_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let res = self.output_reader.read(buf).tap(|l| {
let res = self.output_reader.read(buf).inspect(|l| {
trace!(read_length = format!("{l:?}"), "returning read");
});
self.handle.notify_read();
Expand Down Expand Up @@ -633,7 +631,7 @@ impl<P: StorageProvider> Seek for StreamDownload<P> {
return self
.output_reader
.seek(SeekFrom::Start(absolute_seek_position))
.tap(|p| debug!(position = format!("{p:?}"), "returning seek position"));
.inspect_err(|p| debug!(position = format!("{p:?}"), "returning seek position"));
}

self.handle.request_position(absolute_seek_position);
Expand All @@ -648,7 +646,7 @@ impl<P: StorageProvider> Seek for StreamDownload<P> {

self.output_reader
.seek(SeekFrom::Start(absolute_seek_position))
.tap(|p| debug!(position = format!("{p:?}"), "returning seek position"))
.inspect_err(|p| debug!(position = format!("{p:?}"), "returning seek position"))
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use bytes::Bytes;
pub use command_builder::*;
pub use ffmpeg::*;
use futures::Stream;
use tap::TapFallible;
use tempfile::NamedTempFile;
use tracing::{debug, error, warn};
pub use yt_dlp::*;
Expand Down Expand Up @@ -209,25 +208,27 @@ impl ProcessStream {
for file in &mut self.stderr_files {
let _ = file
.flush()
.tap_err(|e| error!("error flushing file: {e:?}"));
.inspect_err(|e| error!("error flushing file: {e:?}"));
// Need to reopen the file to access the contents since it was written to from an
// external process
if let Ok(mut file_handle) = file
.reopen()
.tap_err(|e| error!("error opening file: {e:?}"))
.inspect_err(|e| error!("error opening file: {e:?}"))
{
let mut buf = String::new();
let _ = file_handle
.read_to_string(&mut buf)
.tap_err(|e| error!("error reading file: {e:?}"));
.inspect_err(|e| error!("error reading file: {e:?}"));
warn!("stderr from child process: {buf}");
}
}
}

fn close_stderr_files(&mut self) {
for file in mem::take(&mut self.stderr_files) {
let _ = file.close().tap_err(|e| warn!("error closing file: {e:?}"));
let _ = file
.close()
.inspect_err(|e| warn!("error closing file: {e:?}"));
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/source/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::Instant;

use parking_lot::{Condvar, Mutex, RwLock};
use rangemap::RangeSet;
use tap::TapFallible;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{Notify, mpsc};
use tracing::{debug, error};
Expand Down Expand Up @@ -47,7 +46,7 @@ impl SourceHandle {
pub(crate) fn seek(&self, position: u64) {
self.seek_tx
.try_send(position)
.tap_err(|e| {
.inspect_err(|e| {
if let TrySendError::Full(capacity) = e {
error!("Seek buffer full. Capacity: {capacity}");
}
Expand Down
8 changes: 4 additions & 4 deletions src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use futures::{Future, Stream, StreamExt, TryStream};
use handle::{
DownloadStatus, Downloaded, NotifyRead, PositionReached, RequestedPosition, SourceHandle,
};
use tap::TapFallible;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -133,9 +132,10 @@ pub(crate) struct Source<S: SourceStream, W: StorageWriter> {
cancellation_token: CancellationToken,
}

impl<S: SourceStream, W: StorageWriter> Source<S, W>
impl<S, W> Source<S, W>
where
S::Error: Debug,
S: SourceStream<Error: Debug>,
W: StorageWriter,
{
pub(crate) fn new(
writer: W,
Expand Down Expand Up @@ -255,7 +255,7 @@ where
// we'll cap the reconnect time to prevent additional delays between reconnect attempts.
let reconnect_pos = tokio::time::timeout(self.retry_timeout, stream.reconnect(pos)).await;
if reconnect_pos
.tap_err(|e| warn!("error attempting to reconnect: {e:?}"))
.inspect_err(|e| warn!("error attempting to reconnect: {e:?}"))
.is_ok()
{
if let Some(on_reconnect) = &mut self.on_reconnect {
Expand Down
Loading
Loading