Skip to content

Commit

Permalink
feat(input): Support HLS streams
Browse files Browse the repository at this point in the history
Closes: #241
  • Loading branch information
Erk- committed Jul 6, 2024
1 parent 5bbe80f commit 7a7e695
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 18 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ version = "0.4.1"
async-trait = { optional = true, version = "0.1" }
audiopus = { optional = true, version = "0.3.0-rc.0" }
byteorder = { optional = true, version = "1" }
bytes = { optional = true, version = "1" }
bytes = { version = "1" }
crypto_secretbox = { optional = true, features = ["std"], version = "0.1" }
dashmap = { optional = true, version = "5" }
derivative = "2"
Expand All @@ -41,6 +41,7 @@ serenity-voice-model = { optional = true, version = "0.2" }
simd-json = { features = ["serde_impl"], optional = true, version = "0.13" }
socket2 = { optional = true, version = "0.5" }
streamcatcher = { optional = true, version = "1" }
stream_lib = { version = "0.4.1" }
symphonia = { default_features = false, optional = true, version = "0.5.2" }
symphonia-core = { optional = true, version = "0.5.2" }
tokio = { default-features = false, optional = true, version = "1.0" }
Expand Down Expand Up @@ -131,7 +132,7 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"]

# Behaviour altering features.
builtin-queue = []
receive = ["dep:bytes", "discortp?/demux", "discortp?/rtcp"]
receive = ["discortp?/demux", "discortp?/rtcp"]

# Used for docgen/testing/benchmarking.
full-doc = ["default", "twilight", "builtin-queue", "receive"]
Expand Down
1 change: 1 addition & 0 deletions src/input/metadata/ytdl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Output {
pub uploader: Option<String>,
pub url: String,
pub webpage_url: Option<String>,
pub protocol: Option<String>,
}

impl Output {
Expand Down
134 changes: 134 additions & 0 deletions src/input/sources/hls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use bytes::Bytes;
use futures::StreamExt;
use pin_project::pin_project;
use reqwest::{header::HeaderMap, Client};
use serenity::async_trait;
use stream_lib::{DownloadStream, Event};
use symphonia_core::io::MediaSource;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio_util::io::StreamReader;

use crate::input::{
AsyncAdapterStream, AsyncMediaSource, AudioStream, AudioStreamError, Compose, Input,
};

/// Lazy HLS stream
#[derive(Debug)]
pub struct HlsRequest {
/// HLS downloader
pub hls: Option<DownloadStream>,
}

impl HlsRequest {
#[must_use]
/// Create a lazy HLS request.
pub fn new(client: Client, request: String) -> Self {
Self::new_with_headers(client, request, HeaderMap::default())
}

#[must_use]
/// Create a lazy HTTP request.
pub fn new_with_headers(client: Client, request: String, headers: HeaderMap) -> Self {
let request = client.get(&request).headers(headers).build().unwrap();
let hls = stream_lib::download_hls(client, request, None);

HlsRequest { hls: Some(hls) }
}

fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
let Some(hls) = self.hls.take() else {
return Err(AudioStreamError::Fail("hls can only be used once".into()));
};

let stream = Box::new(StreamReader::new(hls.map(|ev| match ev {
Event::Bytes { bytes } => Ok(bytes),
Event::End => Ok(Bytes::new()),
Event::Error { error } => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error,
)),
})));

Ok(HlsStream { stream })
}
}

#[pin_project]
struct HlsStream {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncRead for HlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for HlsStream {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for HlsStream {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}

#[async_trait]
impl Compose for HlsRequest {
fn create(&mut self) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
self.create_stream().map(|input| {
let stream = AsyncAdapterStream::new(Box::new(input), 64 * 1024);

AudioStream {
input: Box::new(stream) as Box<dyn MediaSource>,
hint: None,
}
})
}

async fn create_async(
&mut self,
) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}

fn should_create_async(&self) -> bool {
false
}
}

impl From<HlsRequest> for Input {
fn from(val: HlsRequest) -> Self {
Input::Lazy(Box::new(val))
}
}
3 changes: 2 additions & 1 deletion src/input/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod file;
mod http;
mod ytdl;
mod hls;

pub use self::{file::*, http::*, ytdl::*};
pub use self::{file::*, http::*, ytdl::*, hls::*};
34 changes: 19 additions & 15 deletions src/input/sources/ytdl.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use crate::input::{
metadata::ytdl::Output,
AudioStream,
AudioStreamError,
AuxMetadata,
Compose,
HttpRequest,
Input,
metadata::ytdl::Output, AudioStream, AudioStreamError, AuxMetadata, Compose, HttpRequest, Input,
};
use async_trait::async_trait;
use reqwest::{
Expand All @@ -16,6 +10,8 @@ use std::{error::Error, io::ErrorKind};
use symphonia_core::io::MediaSource;
use tokio::process::Command;

use super::HlsRequest;

const YOUTUBE_DL_COMMAND: &str = "yt-dlp";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -194,14 +190,22 @@ impl Compose for YoutubeDl {
}));
}

let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};

req.create_async().await
match result.protocol.as_deref() {
Some("m3u8_native") => {
let mut req =
HlsRequest::new_with_headers(self.client.clone(), result.url, headers);
req.create()
},
_ => {
let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};
req.create_async().await
},
}
}

fn should_create_async(&self) -> bool {
Expand Down

0 comments on commit 7a7e695

Please sign in to comment.