From 7a7e6951b1a4e20e2e25e2dd9db2308769b320cf Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sat, 6 Jul 2024 13:19:28 +0200 Subject: [PATCH] feat(input): Support HLS streams Closes: #241 --- Cargo.toml | 5 +- src/input/metadata/ytdl.rs | 1 + src/input/sources/hls.rs | 134 +++++++++++++++++++++++++++++++++++++ src/input/sources/mod.rs | 3 +- src/input/sources/ytdl.rs | 34 +++++----- 5 files changed, 159 insertions(+), 18 deletions(-) create mode 100644 src/input/sources/hls.rs diff --git a/Cargo.toml b/Cargo.toml index 4fa33d224..b4481dbbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ version = "0.4.1" async-trait = { optional = true, version = "0.1" } audiopus = { optional = true, version = "0.3.0-rc.0" } byteorder = { optional = true, version = "1" } -bytes = { optional = true, version = "1" } +bytes = { version = "1" } crypto_secretbox = { optional = true, features = ["std"], version = "0.1" } dashmap = { optional = true, version = "5" } derivative = "2" @@ -41,6 +41,7 @@ serenity-voice-model = { optional = true, version = "0.2" } simd-json = { features = ["serde_impl"], optional = true, version = "0.13" } socket2 = { optional = true, version = "0.5" } streamcatcher = { optional = true, version = "1" } +stream_lib = { version = "0.4.1" } symphonia = { default_features = false, optional = true, version = "0.5.2" } symphonia-core = { optional = true, version = "0.5.2" } tokio = { default-features = false, optional = true, version = "1.0" } @@ -131,7 +132,7 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"] # Behaviour altering features. builtin-queue = [] -receive = ["dep:bytes", "discortp?/demux", "discortp?/rtcp"] +receive = ["discortp?/demux", "discortp?/rtcp"] # Used for docgen/testing/benchmarking. full-doc = ["default", "twilight", "builtin-queue", "receive"] diff --git a/src/input/metadata/ytdl.rs b/src/input/metadata/ytdl.rs index 4a9ab6c2e..38c702595 100644 --- a/src/input/metadata/ytdl.rs +++ b/src/input/metadata/ytdl.rs @@ -19,6 +19,7 @@ pub struct Output { pub uploader: Option, pub url: String, pub webpage_url: Option, + pub protocol: Option, } impl Output { diff --git a/src/input/sources/hls.rs b/src/input/sources/hls.rs new file mode 100644 index 000000000..f6819364e --- /dev/null +++ b/src/input/sources/hls.rs @@ -0,0 +1,134 @@ +use std::{ + io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom}, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::StreamExt; +use pin_project::pin_project; +use reqwest::{header::HeaderMap, Client}; +use serenity::async_trait; +use stream_lib::{DownloadStream, Event}; +use symphonia_core::io::MediaSource; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; +use tokio_util::io::StreamReader; + +use crate::input::{ + AsyncAdapterStream, AsyncMediaSource, AudioStream, AudioStreamError, Compose, Input, +}; + +/// Lazy HLS stream +#[derive(Debug)] +pub struct HlsRequest { + /// HLS downloader + pub hls: Option, +} + +impl HlsRequest { + #[must_use] + /// Create a lazy HLS request. + pub fn new(client: Client, request: String) -> Self { + Self::new_with_headers(client, request, HeaderMap::default()) + } + + #[must_use] + /// Create a lazy HTTP request. + pub fn new_with_headers(client: Client, request: String, headers: HeaderMap) -> Self { + let request = client.get(&request).headers(headers).build().unwrap(); + let hls = stream_lib::download_hls(client, request, None); + + HlsRequest { hls: Some(hls) } + } + + fn create_stream(&mut self) -> Result { + let Some(hls) = self.hls.take() else { + return Err(AudioStreamError::Fail("hls can only be used once".into())); + }; + + let stream = Box::new(StreamReader::new(hls.map(|ev| match ev { + Event::Bytes { bytes } => Ok(bytes), + Event::End => Ok(Bytes::new()), + Event::Error { error } => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + error, + )), + }))); + + Ok(HlsStream { stream }) + } +} + +#[pin_project] +struct HlsStream { + #[pin] + stream: Box, +} + +impl AsyncRead for HlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + AsyncRead::poll_read(self.project().stream, cx, buf) + } +} + +impl AsyncSeek for HlsStream { + fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> { + Err(IoErrorKind::Unsupported.into()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unreachable!() + } +} + +#[async_trait] +impl AsyncMediaSource for HlsStream { + fn is_seekable(&self) -> bool { + false + } + + async fn byte_len(&self) -> Option { + None + } + + async fn try_resume( + &mut self, + _offset: u64, + ) -> Result, AudioStreamError> { + Err(AudioStreamError::Unsupported) + } +} + +#[async_trait] +impl Compose for HlsRequest { + fn create(&mut self) -> Result>, AudioStreamError> { + self.create_stream().map(|input| { + let stream = AsyncAdapterStream::new(Box::new(input), 64 * 1024); + + AudioStream { + input: Box::new(stream) as Box, + hint: None, + } + }) + } + + async fn create_async( + &mut self, + ) -> Result>, AudioStreamError> { + Err(AudioStreamError::Unsupported) + } + + fn should_create_async(&self) -> bool { + false + } +} + +impl From for Input { + fn from(val: HlsRequest) -> Self { + Input::Lazy(Box::new(val)) + } +} diff --git a/src/input/sources/mod.rs b/src/input/sources/mod.rs index 47ce9cc15..f7d31eb78 100644 --- a/src/input/sources/mod.rs +++ b/src/input/sources/mod.rs @@ -1,5 +1,6 @@ mod file; mod http; mod ytdl; +mod hls; -pub use self::{file::*, http::*, ytdl::*}; +pub use self::{file::*, http::*, ytdl::*, hls::*}; diff --git a/src/input/sources/ytdl.rs b/src/input/sources/ytdl.rs index abad20b8a..a71488984 100644 --- a/src/input/sources/ytdl.rs +++ b/src/input/sources/ytdl.rs @@ -1,11 +1,5 @@ use crate::input::{ - metadata::ytdl::Output, - AudioStream, - AudioStreamError, - AuxMetadata, - Compose, - HttpRequest, - Input, + metadata::ytdl::Output, AudioStream, AudioStreamError, AuxMetadata, Compose, HttpRequest, Input, }; use async_trait::async_trait; use reqwest::{ @@ -16,6 +10,8 @@ use std::{error::Error, io::ErrorKind}; use symphonia_core::io::MediaSource; use tokio::process::Command; +use super::HlsRequest; + const YOUTUBE_DL_COMMAND: &str = "yt-dlp"; #[derive(Clone, Debug)] @@ -194,14 +190,22 @@ impl Compose for YoutubeDl { })); } - let mut req = HttpRequest { - client: self.client.clone(), - request: result.url, - headers, - content_length: result.filesize, - }; - - req.create_async().await + match result.protocol.as_deref() { + Some("m3u8_native") => { + let mut req = + HlsRequest::new_with_headers(self.client.clone(), result.url, headers); + req.create() + }, + _ => { + let mut req = HttpRequest { + client: self.client.clone(), + request: result.url, + headers, + content_length: result.filesize, + }; + req.create_async().await + }, + } } fn should_create_async(&self) -> bool {