This repository has been archived by the owner on Dec 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 50
Tx file propagation #58
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
1a3a5c9
rebase from master and p2p_beacon
musitdev c38363c
show pubkey on creation
musitdev d77ed04
correct cli key print
musitdev d80810b
rebase from master and p2p_beacon
musitdev c2780b7
show pubkey on creation
musitdev 65aa73d
correct cli key print
musitdev 3894e80
add some logs
musitdev 55883dd
rebase from master
70a86e9
add logs for file download
musitdev 9b51b43
add asset management for propagated tx
musitdev f73e625
start download manager
musitdev dce3859
correct downlaod manager start
musitdev 7e0bb1b
add some comment on p2p connection
musitdev aee1b58
do some tests and remove some logs
musitdev 6cf6fc3
correct clippy warning
musitdev 05e2dd9
corrrect PR comments
musitdev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,93 @@ | ||
use std::sync::Arc; | ||
use crate::cli::Config; | ||
use eyre::Result; | ||
use futures_util::TryStreamExt; | ||
use http_body_util::combinators::BoxBody; | ||
use http_body_util::{BodyExt, Full, StreamBody}; | ||
use hyper::body::{self, Bytes, Frame}; | ||
use hyper::server::conn::http1; | ||
use hyper::service::service_fn; | ||
use hyper::{Request, Response, StatusCode}; | ||
use hyper_util::rt::TokioIo; | ||
use std::path::Path; | ||
use tokio::fs::File; | ||
use tokio::net::TcpListener; | ||
use tokio::task::JoinHandle; | ||
use tokio_util::io::ReaderStream; | ||
|
||
use crate::{cli::Config, storage}; | ||
//start the local server and serve the specified file path. | ||
//Return the server task join handle. | ||
pub async fn serve_files(config: &Config) -> Result<JoinHandle<()>> { | ||
let mut bind_addr = config.p2p_listen_addr; | ||
bind_addr.set_port(config.http_download_port); | ||
let listener = TcpListener::bind(bind_addr).await?; | ||
|
||
pub struct DownloadManager { | ||
database: Arc<storage::Database>, | ||
file_storage: Arc<storage::File>, | ||
let jh = tokio::spawn({ | ||
let data_directory = config.data_directory.clone(); | ||
async move { | ||
loop { | ||
match listener.accept().await { | ||
Ok((stream, _from)) => { | ||
let io = TokioIo::new(stream); | ||
tokio::task::spawn({ | ||
let data_directory = data_directory.clone(); | ||
async move { | ||
if let Err(err) = http1::Builder::new() | ||
.serve_connection( | ||
io, | ||
service_fn(|req| server_process_file(req, &data_directory)), | ||
) | ||
.await | ||
{ | ||
tracing::error!("Error serving node connection: {err}. Wait for a new node connection."); | ||
} | ||
} | ||
}); | ||
} | ||
Err(err) => { | ||
tracing::error!("Error during node connection to file http server:{err}"); | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
|
||
Ok(jh) | ||
} | ||
|
||
impl DownloadManager { | ||
pub fn new( | ||
_config: Arc<Config>, | ||
database: Arc<storage::Database>, | ||
file_storage: Arc<storage::File>, | ||
) -> Self { | ||
DownloadManager { | ||
database, | ||
file_storage, | ||
async fn server_process_file( | ||
req: Request<body::Incoming>, | ||
data_directory: &Path, | ||
) -> std::result::Result<Response<BoxBody<Bytes, std::io::Error>>, hyper::Error> { | ||
let file_digest = &req.uri().path()[1..]; | ||
|
||
let mut file_path = data_directory.join("images").join(file_digest); | ||
|
||
let file = match File::open(&file_path).await { | ||
Ok(file) => file, | ||
Err(_) => { | ||
//try to see if the file is currently being updated. | ||
file_path.set_extension(".tmp"); | ||
let (status_code, message) = if file_path.as_path().exists() { | ||
( | ||
StatusCode::PARTIAL_CONTENT, | ||
"Update in progess, retry later", | ||
) | ||
} else { | ||
(StatusCode::NOT_FOUND, "File not found") | ||
}; | ||
return Ok(Response::builder() | ||
.status(status_code) | ||
.body(Full::new(message.into()).map_err(|e| match e {}).boxed()) | ||
.unwrap()); | ||
} | ||
} | ||
}; | ||
|
||
let file_hash = file_digest.to_string(); | ||
let reader = ReaderStream::new(file); | ||
let stream_body = StreamBody::new(reader.map_ok(Frame::data)); | ||
|
||
Ok(Response::builder() | ||
.status(StatusCode::OK) | ||
.body(BodyExt::boxed(stream_body)) | ||
.unwrap()) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why drop the default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To manage the case the node start without connecting, sort of primary bootstraper. In this case, you specify nothing and the node didn't try to connect.