Skip to content

Commit

Permalink
fix an issue that now ssh server will unpack until receiving all it's…
Browse files Browse the repository at this point in the history
… data
  • Loading branch information
benjamin-747 committed Jan 17, 2024
1 parent 6a45281 commit 6691589
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
64 changes: 36 additions & 28 deletions gateway/src/git_protocol/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@
//!
//!
//!
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Duration, Utc};
use git::lfs::lfs_structs::Link;
use russh::server::{self, Auth, Msg, Session};
use russh::server::{self, Auth, Msg, Response, Session};
use russh::{Channel, ChannelId};
use russh_keys::key;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncReadExt;

use storage::driver::database::storage::ObjectStorage;

use git::lfs::lfs_structs::Link;
use git::protocol::pack::{self};
use git::protocol::ServiceType;
use git::protocol::{PackProtocol, Protocol};
use storage::driver::database::storage::ObjectStorage;

type ClientMap = HashMap<(usize, ChannelId), Channel<Msg>>;

Expand All @@ -33,6 +32,7 @@ pub struct SshServer {
pub storage: Arc<dyn ObjectStorage>,
// TODO: consider is it a good choice to bind data here, find a better solution to bind data with ssh client
pub pack_protocol: Option<PackProtocol>,
pub data_combined: Vec<u8>,
}

impl server::Server for SshServer {
Expand Down Expand Up @@ -120,7 +120,7 @@ impl server::Handler for SshServer {
};
session.data(channel, serde_json::to_vec(&link).unwrap().into());
}
_ => println!("Not Supported command!"),
command => println!("Not Supported command! {}", command),
}
Ok((self, session))
}
Expand All @@ -134,6 +134,15 @@ impl server::Handler for SshServer {
Ok((self, Auth::Accept))
}

async fn auth_keyboard_interactive(
self,
_: &str,
_: &str,
_: Option<Response<'async_trait>>,
) -> Result<(Self, Auth), Self::Error> {
Ok((self, Auth::Accept))
}

async fn auth_password(self, user: &str, password: &str) -> Result<(Self, Auth), Self::Error> {
tracing::info!("auth_password: {} / {}", user, password);
// in this example implementation, any username/password combination is accepted
Expand All @@ -142,46 +151,46 @@ impl server::Handler for SshServer {

async fn data(
mut self,
channel: ChannelId,
_: ChannelId,
data: &[u8],
mut session: Session,
session: Session,
) -> Result<(Self, Session), Self::Error> {
let pack_protocol = self.pack_protocol.as_mut().unwrap();

match pack_protocol.service_type {
ServiceType::UploadPack => {
self.handle_upload_pack(channel, data, &mut session).await;
}
ServiceType::ReceivePack => {
self.handle_receive_pack(channel, data, &mut session).await;
}
};

self.data_combined.extend(data);
Ok((self, session))
}

async fn channel_eof(
self,
mut self,
channel: ChannelId,
mut session: Session,
) -> Result<(Self, Session), Self::Error> {
if let Some(pack_protocol) = self.pack_protocol.as_mut() {
match pack_protocol.service_type {
ServiceType::UploadPack => {
self.handle_upload_pack(channel, &mut session).await;
}
ServiceType::ReceivePack => {
self.handle_receive_pack(channel, &mut session).await;
}
};
}

{
let mut clients = self.clients.lock().unwrap();
clients.remove(&(self.id, channel));
}
session.exit_status_request(channel, 0000);
session.close(channel);

Ok((self, session))
}
}

impl SshServer {
async fn handle_upload_pack(&mut self, channel: ChannelId, data: &[u8], session: &mut Session) {
async fn handle_upload_pack(&mut self, channel: ChannelId, session: &mut Session) {
let pack_protocol = self.pack_protocol.as_mut().unwrap();

let (send_pack_data, buf) = pack_protocol
.git_upload_pack(&mut Bytes::copy_from_slice(data))
.git_upload_pack(&mut Bytes::copy_from_slice(&self.data_combined))
.await
.unwrap();

Expand All @@ -205,13 +214,12 @@ impl SshServer {
async fn handle_receive_pack(
&mut self,
channel: ChannelId,
data: &[u8],
session: &mut Session,
) {
let pack_protocol = self.pack_protocol.as_mut().unwrap();

let buf = pack_protocol
.git_receive_pack(Bytes::from(data.to_vec()))
.git_receive_pack(Bytes::from(self.data_combined.to_vec()))
.await
.unwrap();
tracing::info!("report status: {:?}", buf);
Expand Down
1 change: 1 addition & 0 deletions gateway/src/ssh_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub async fn start_server(command: &SshOptions) {
id: 0,
storage: database::init(data_source).await,
pack_protocol: None,
data_combined: Vec::new(),
};
let server_url = format!("{}:{}", host, ssh_port);
let addr = SocketAddr::from_str(&server_url).unwrap();
Expand Down

0 comments on commit 6691589

Please sign in to comment.