Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file-stream examples #3065

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/stream-to-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ edition = "2021"
publish = false

[dependencies]
async-stream = "0.3"
axum = { path = "../../axum", features = ["multipart"] }
axum-extra = { path = "../../axum-extra", features = ["file-stream"] }
futures = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
Expand Down
292 changes: 283 additions & 9 deletions examples/stream-to-file/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@
//! cargo run -p example-stream-to-file
//! ```

use async_stream::try_stream;
use axum::{
body::Bytes,
extract::{Multipart, Path, Request},
http::StatusCode,
response::{Html, Redirect},
http::{header, HeaderMap, StatusCode},
response::{Html, IntoResponse, Redirect, Response},
routing::{get, post},
BoxError, Router,
};
use axum_extra::response::file_stream::FileStream;
use futures::{Stream, TryStreamExt};
use std::io;
use tokio::{fs::File, io::BufWriter};
use tokio_util::io::StreamReader;
use std::{io, path::PathBuf};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter},
};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

const UPLOADS_DIRECTORY: &str = "uploads";

const DOWNLOAD_DIRECTORY: &str = "downloads";
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
Expand All @@ -35,9 +39,23 @@ async fn main() {
.await
.expect("failed to create `uploads` directory");

tokio::fs::create_dir(DOWNLOAD_DIRECTORY)
.await
.expect("failed to create `downloads` directory");

//create a file to download
create_test_file(std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"))
.await
.expect("failed to create test file");

let app = Router::new()
.route("/", get(show_form).post(accept_form))
.route("/file/{file_name}", post(save_request_body));
.route("/upload", get(show_form).post(accept_form))
.route("/", get(show_form2).post(accept_form))
.route("/file/{file_name}", post(save_request_body))
.route("/file_download", get(file_download_handler))
.route("/simpler_file_download", get(simpler_file_download_handler))
.route("/range_file", get(file_range_handler))
.route("/range_file_stream", get(try_file_range_handler));

let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
Expand All @@ -46,6 +64,19 @@ async fn main() {
axum::serve(listener, app).await.unwrap();
}

async fn create_test_file(path: PathBuf) -> io::Result<()> {
let mut file = File::create(path).await?;
for i in 1..=30 {
let line = format!(
"Hello, this is the simulated file content! This is line {}\n",
i
);
file.write_all(line.as_bytes()).await?;
}
file.flush().await?;
Ok(())
}

// Handler that streams the request body to a file.
//
// POST'ing to `/file/foo.txt` will create a file called `foo.txt`.
Expand Down Expand Up @@ -84,6 +115,249 @@ async fn show_form() -> Html<&'static str> {
)
}

// Handler that returns HTML for a multipart form.
async fn show_form2() -> Html<&'static str> {
Html(
r#"
<!doctype html>
<html>
<head>
<title>Upload and Download!</title>
</head>
<body>
<h1>Upload and Download Files</h1>

<!-- Form for uploading files -->
<form action="/" method="post" enctype="multipart/form-data">
<div>
<label>
Upload file:
<input type="file" name="file" multiple>
</label>
</div>

<div>
<input type="submit" value="Upload files">
</div>
</form>

<hr>
<!-- Buttons for downloading files -->
<form action="/file_download" method="get">
<div>
<input type="submit" value="Download file">
</div>
</form>

<!-- Button for partial file download (Range: 0-100) -->
<form action="/range_file_stream" method="get">
<div>
<input type="hidden" name="range" value="0-100">
<input type="submit" value="Download range (0-100)">
</div>
</form>

</body>
</html>
"#,
)
}

/// A simpler file download handler that uses the `FileStream` response.
/// Returns the entire file as a stream.
async fn simpler_file_download_handler() -> Response {
//If you want to simply return a file as a stream
// you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length.
FileStream::<ReaderStream<File>>::from_path(
&std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"),
)
.await
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response())
.into_response()
}

/// If you want to control the returned files in more detail you can implement a Stream
/// For example, use the try_stream! macro to construct a file stream and set which parts are needed.
async fn file_download_handler() -> Response {
let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt");
let file_stream = match try_stream(&file_path, 5, 25, 10).await {
Ok(file_stream) => file_stream,
Err(e) => {
println!("{e}");
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response();
}
};

// Use FileStream to return and set some information.
// Will set application/octet-stream in the header.
let file_stream_resp = FileStream::new(Box::pin(file_stream))
.file_name("test.txt")
.content_size(20_u64);

file_stream_resp.into_response()
}

/// More complex manipulation of files and conversion to a stream
async fn try_stream(
file_path: &str,
start: u64,
mut end: u64,
buffer_size: usize,
) -> Result<impl Stream<Item = Result<Vec<u8>, std::io::Error>>, String> {
let mut file = File::open(file_path)
.await
.map_err(|e| format!("open file:{file_path} err:{e}"))?;

file.seek(std::io::SeekFrom::Start(start))
.await
.map_err(|e| format!("file:{file_path} seek err:{e}"))?;

if end == 0 {
let metadata = file
.metadata()
.await
.map_err(|e| format!("file:{file_path} get metadata err:{e}"))?;
end = metadata.len();
}

let mut buffer = vec![0; buffer_size];

let stream = try_stream! {
let mut total_read = 0;

while total_read < end {
let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read);
let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
if n == 0 {
break; // EOF
}
total_read += n as u64;
yield buffer[..n].to_vec();

}
};
Ok(stream)
}

async fn try_stream2(
mut file: File,
start: u64,
mut end: u64,
buffer_size: usize,
) -> Result<impl Stream<Item = Result<Vec<u8>, std::io::Error>>, String> {
file.seek(std::io::SeekFrom::Start(start))
.await
.map_err(|e| format!("file seek err:{e}"))?;

if end == 0 {
let metadata = file
.metadata()
.await
.map_err(|e| format!("file get metadata err:{e}"))?;
end = metadata.len();
}

let mut buffer = vec![0; buffer_size];

let stream = try_stream! {
let mut total_read = 0;

while total_read < end {
let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read);
let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
if n == 0 {
break; // EOF
}
total_read += n as u64;
yield buffer[..n].to_vec();

}
};
Ok(stream)
}

/// A file download handler that accepts a range header and returns a partial file as a stream.
/// You can return directly from the path
/// But you can't download this stream directly from your browser, you need to use a tool like curl or Postman.
async fn try_file_range_handler(headers: HeaderMap) -> Response {
let range_header = headers
.get(header::RANGE)
.and_then(|value| value.to_str().ok());

let (start, end) = if let Some(range) = range_header {
if let Some(range) = parse_range_header(range) {
range
} else {
return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response();
}
} else {
(0, 0) // default range end = 0, if end = 0 end == file size - 1
};

let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt");
FileStream::<ReaderStream<File>>::try_range_response(
std::path::Path::new(&file_path),
start,
end,
)
.await
.unwrap()
}

/// If you want to control the stream yourself
async fn file_range_handler(headers: HeaderMap) -> Response {
// Parse the range header to get the start and end values.
let range_header = headers
.get(header::RANGE)
.and_then(|value| value.to_str().ok());

// If the range header is invalid, return a 416 Range Not Satisfiable response.
let (start, end) = if let Some(range) = range_header {
if let Some(range) = parse_range_header(range) {
range
} else {
return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response();
}
} else {
(0, 0) // default range end = 0, if end = 0 end == file size - 1
};

let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt");

let file = File::open(file_path).await.unwrap();

let file_size = file.metadata().await.unwrap().len();

let file_stream = match try_stream2(file, start, end, 256).await {
Ok(file_stream) => file_stream,
Err(e) => {
println!("{e}");
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response();
}
};

FileStream::new(Box::pin(file_stream)).into_range_response(start, end, file_size)
}

/// Parse the range header and return the start and end values.
fn parse_range_header(range: &str) -> Option<(u64, u64)> {
let range = range.strip_prefix("bytes=")?;
let mut parts = range.split('-');
let start = parts.next()?.parse::<u64>().ok()?;
let end = parts
.next()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
if start > end {
return None;
}
Some((start, end))
}

// Handler that accepts a multipart form upload and streams each field to a file.
async fn accept_form(mut multipart: Multipart) -> Result<Redirect, (StatusCode, String)> {
while let Ok(Some(field)) = multipart.next_field().await {
Expand Down
Loading