Skip to content

Commit

Permalink
Merge branch 'newCode'
Browse files Browse the repository at this point in the history
# Conflicts:
#	p2p/src/network/event_handler.rs
  • Loading branch information
wujian0327 committed Jan 23, 2024
2 parents cd45ba9 + 7c3b50f commit aa261d7
Show file tree
Hide file tree
Showing 13 changed files with 1,220 additions and 1,042 deletions.
46 changes: 2 additions & 44 deletions p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
```
cargo run service p2p --host 0.0.0.0 --p2p-port 8200 --relay-server
or
cargo run service p2p --host 0.0.0.0 --p2p-port 8200 --relay-server
cargo run service p2p --host 0.0.0.0 --p2p-port 8200 --relay-server --secret-key 6b911fd37cdf5c81d4c0adb1ab7fa822ed253ab0ad9aa18d77257c88b29b7180
```

### start a client
Expand All @@ -17,47 +17,5 @@ cargo run service p2p --host 0.0.0.0 --p2p-port 8201 --bootstrap-node /ip4/{rela
### start another client

```
cargo run service p2p --host 0.0.0.0 --p2p-port 8202 --bootstrap-node /ip4/{relay-server-ip}/tcp/8200
```

### try to use DHT

#### put a key-value to p2p network in one terminal

```
kad put 123 abc
```

#### get a key-value from p2p network in another terminal

```
kad get 123
```

### try to clone a repository

```
mega clone p2p://12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X/mega_test.git
```

```
mega pull p2p://12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X/mega_test.git
```

### share a repository to DHT

```
mega provide mega_test.git
```

### clone git-object from p2p network

```
mega clone-object mega_test.git
```

### pull git-object from p2p network

```
mega pull-object mega_test.git
cargo run service p2p --host 0.0.0.0 --p2p-port 8202 --bootstrap-node /ip4/{relay-server-ip}/tcp/8200 --secret-key 6b911fd37cdf5c81d4c0adb1ab7fa822ed253ab0ad9aa18d77257c88b29b7182 --p2p-http-port 8002
```
197 changes: 112 additions & 85 deletions p2p/src/node/client_http.rs → p2p/src/http/client_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use axum::routing::put;
use axum::{
Expand All @@ -16,15 +17,42 @@ use axum::{
routing::get,
Json, Router,
};
use futures::channel::mpsc;
use storage::driver::database::storage::ObjectStorage;

use crate::network::Client;

use super::command_handler::HttpHandler;

#[derive(Clone)]
pub struct P2pNodeState {
pub sender: mpsc::Sender<String>,
network_client: Client,
storage: Arc<dyn ObjectStorage>,
local_peer_id: String,
relay_peer_id: String,
}

pub fn get_http_handler(state: State<P2pNodeState>) -> HttpHandler {
HttpHandler {
network_client: state.network_client.clone(),
storage: state.storage.clone(),
local_peer_id: state.local_peer_id.clone(),
relay_peer_id: state.relay_peer_id.clone(),
}
}

pub async fn server(sender: mpsc::Sender<String>) {
let state = P2pNodeState { sender };
pub async fn server(
network_client: Client,
storage: Arc<dyn ObjectStorage>,
local_peer_id: String,
relay_peer_id: String,
p2p_http_port: u16,
) {
let state = P2pNodeState {
network_client,
storage,
local_peer_id,
relay_peer_id,
};

let app = Router::new()
.nest(
Expand All @@ -37,7 +65,8 @@ pub async fn server(sender: mpsc::Sender<String>) {
// .layer(TraceLayer::new_for_http())
.with_state(state);

let addr = SocketAddr::from_str("0.0.0.0:8001").unwrap();
let p2p_http_address = format!("0.0.0.0:{}", p2p_http_port);
let addr = SocketAddr::from_str(p2p_http_address.as_str()).unwrap();
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
Expand All @@ -63,59 +92,57 @@ async fn mega_provide(
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["mega", "provide", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.mega_provide(repo_name.clone())
.await
}

async fn mega_search(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["mega", "search", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state).mega_search(repo_name.clone()).await
}

async fn mega_clone(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let mega_address = query.get("mega_address").unwrap();
let line = ["mega", "clone", mega_address].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.mega_clone(mega_address.clone())
.await
}

async fn mega_clone_obj(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["mega", "clone-object", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.mega_clone_or_pull_obj(repo_name.clone())
.await
}

async fn mega_pull(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let mega_address = query.get("mega_address").unwrap();
let line = ["mega", "pull", mega_address].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.mega_pull(mega_address.clone())
.await
}

async fn mega_pull_obj(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["mega", "pull-object", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.mega_clone_or_pull_obj(repo_name.clone())
.await
}

pub fn nostr_routers() -> Router<P2pNodeState> {
Expand All @@ -131,91 +158,91 @@ async fn nostr_subscribe(
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["nostr", "subscribe", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.nostr_subscribe(repo_name.clone())
.await
}

async fn nostr_event_update(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["nostr", "event-update", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.nostr_event_update(repo_name.clone())
.await
}

async fn nostr_event_merge(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["nostr", "event-merge", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.nostr_event_merge(repo_name.clone())
.await
}

async fn nostr_event_issue(
Query(query): Query<HashMap<String, String>>,
state: State<P2pNodeState>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let repo_name = query.get("repo_name").unwrap();
let line = ["nostr", "event-issue", repo_name].join(" ");
state.0.sender.clone().try_send(line).unwrap();
Ok(Json("ok"))
get_http_handler(state)
.nostr_event_issue(repo_name.clone())
.await
}

#[cfg(test)]
mod test {
use std::collections::HashMap;

use async_std::stream::StreamExt;
use axum::{extract::Query, http::Uri};
use futures::channel::mpsc;

use crate::node::client_http::{
mega_clone, mega_clone_obj, mega_pull, mega_pull_obj, P2pNodeState,
};
use crate::node::client_http::{mega_provide, mega_search};

#[tokio::test]
async fn test_mega_routers() {
let query: Query<HashMap<String, String>> = Query::try_from_uri(
&"http://localhost:8001/api/v1/mega/provide?repo_name=reponame.git"
.parse::<Uri>()
.unwrap(),
)
.unwrap();

let addr_query: Query<HashMap<String, String>> = Query::try_from_uri(
&"http://localhost:8001/api/v1/mega/clone?mega_address=p2p://peer_id/reponame.git"
.parse::<Uri>()
.unwrap(),
)
.unwrap();

let (tx, mut rx) = mpsc::channel::<String>(64);
let s = P2pNodeState { sender: tx };
let state = axum::extract::State(s);
let _ = mega_provide(query.clone(), state.clone()).await;
let _ = mega_search(query.clone(), state.clone()).await;
let _ = mega_clone(addr_query.clone(), state.clone()).await;
let _ = mega_clone_obj(query.clone(), state.clone()).await;
let _ = mega_pull(addr_query.clone(), state.clone()).await;
let _ = mega_pull_obj(query.clone(), state.clone()).await;

assert_eq!(rx.next().await.unwrap(), "mega provide reponame.git");
assert_eq!(rx.next().await.unwrap(), "mega search reponame.git");
assert_eq!(
rx.next().await.unwrap(),
"mega clone p2p://peer_id/reponame.git"
);
assert_eq!(rx.next().await.unwrap(), "mega clone-object reponame.git");
assert_eq!(
rx.next().await.unwrap(),
"mega pull p2p://peer_id/reponame.git"
);
assert_eq!(rx.next().await.unwrap(), "mega pull-object reponame.git");
}
// use std::collections::HashMap;

// use async_std::stream::StreamExt;
// use axum::{extract::Query, http::Uri};
// use futures::channel::mpsc;

// use crate::node::client_http::{
// mega_clone, mega_clone_obj, mega_pull, mega_pull_obj, P2pNodeState,
// };
// use crate::node::client_http::{mega_provide, mega_search};

// #[tokio::test]
// async fn test_mega_routers() {
// let query: Query<HashMap<String, String>> = Query::try_from_uri(
// &"http://localhost:8001/api/v1/mega/provide?repo_name=reponame.git"
// .parse::<Uri>()
// .unwrap(),
// )
// .unwrap();

// let addr_query: Query<HashMap<String, String>> = Query::try_from_uri(
// &"http://localhost:8001/api/v1/mega/clone?mega_address=p2p://peer_id/reponame.git"
// .parse::<Uri>()
// .unwrap(),
// )
// .unwrap();

// let (tx, mut rx) = mpsc::channel::<String>(64);
// let s = P2pNodeState { sender: tx };
// let state = axum::extract::State(s);
// let _ = mega_provide(query.clone(), state.clone()).await;
// let _ = mega_search(query.clone(), state.clone()).await;
// let _ = mega_clone(addr_query.clone(), state.clone()).await;
// let _ = mega_clone_obj(query.clone(), state.clone()).await;
// let _ = mega_pull(addr_query.clone(), state.clone()).await;
// let _ = mega_pull_obj(query.clone(), state.clone()).await;

// assert_eq!(rx.next().await.unwrap(), "mega provide reponame.git");
// assert_eq!(rx.next().await.unwrap(), "mega search reponame.git");
// assert_eq!(
// rx.next().await.unwrap(),
// "mega clone p2p://peer_id/reponame.git"
// );
// assert_eq!(rx.next().await.unwrap(), "mega clone-object reponame.git");
// assert_eq!(
// rx.next().await.unwrap(),
// "mega pull p2p://peer_id/reponame.git"
// );
// assert_eq!(rx.next().await.unwrap(), "mega pull-object reponame.git");
// }
}
Loading

0 comments on commit aa261d7

Please sign in to comment.