Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/ideal-world/bios
Browse files Browse the repository at this point in the history
  • Loading branch information
ljl committed Oct 21, 2024
2 parents 65d9399 + 8c9f34f commit 577f440
Show file tree
Hide file tree
Showing 98 changed files with 2,579 additions and 4,652 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ strum = { version = "0.26", features = ["derive"] }
# tardis = { version = "0.1.0-rc.16" }
# tardis = { path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "03ef942" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
# asteroid-mq = { path = "../asteroid/asteroid-mq" }
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "83a6643" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "83a6643" }
#spacegate

# spacegate-shell = { path = "../spacegate/crates/shell", features = [
Expand Down
6 changes: 3 additions & 3 deletions backend/basic/src/process/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,13 @@ struct TaskExecuteEventReq {
}

impl EventAttribute for TaskSetStatusEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_STATUS_FLAG.as_bytes());
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_STATUS_FLAG);
}

impl EventAttribute for TaskSetProcessDataEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_PROCESS_DATA_FLAG.as_bytes());
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_PROCESS_DATA_FLAG);
}

impl EventAttribute for TaskExecuteEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_EXECUTE_TASK_FLAG.as_bytes());
const SUBJECT: Subject = Subject::const_new(EVENT_EXECUTE_TASK_FLAG);
}
31 changes: 28 additions & 3 deletions backend/basic/src/spi/spi_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ pub mod common_pg {
common::get_isolation_flag_from_ext(ext)
}

/// Get the table full name from the extension
/// 根据入参生成对应表全限定名
pub fn get_table_full_name(ext: &HashMap<String, String>, table_flag: String, tag: String) -> String {
let schema_name = get_schema_name_from_ext(ext).expect("ignore");
return format!("{schema_name}.{GLOBAL_STORAGE_FLAG}_{table_flag}_{tag}");
}

/// Check if the schema exists
/// 检查schema是否存在
pub async fn check_schema_exit(client: &TardisRelDBClient, ctx: &TardisContext) -> TardisResult<bool> {
Expand Down Expand Up @@ -279,6 +286,7 @@ pub mod common_pg {
table_flag: &str,
// Create table DDL
table_create_content: &str,
table_inherits: Option<String>,
// Table index
// Format: field name -> index type
indexes: Vec<(&str, &str)>,
Expand All @@ -295,7 +303,18 @@ pub mod common_pg {
} else if !mgr {
return Err(TardisError::bad_request("The requested tag does not exist", ""));
}
do_init_table(&schema_name, &conn, &tag, table_flag, table_create_content, indexes, primary_keys, update_time_field).await?;
do_init_table(
&schema_name,
&conn,
&tag,
table_flag,
table_create_content,
table_inherits,
indexes,
primary_keys,
update_time_field,
)
.await?;
Ok((conn, format!("{schema_name}.{GLOBAL_STORAGE_FLAG}_{table_flag}{tag}")))
}

Expand All @@ -322,7 +341,7 @@ pub mod common_pg {
) -> TardisResult<()> {
let tag = tag.map(|t| format!("_{t}")).unwrap_or_default();
let schema_name = get_schema_name_from_context(ctx);
do_init_table(&schema_name, conn, &tag, table_flag, table_create_content, indexes, primary_keys, update_time_field).await
do_init_table(&schema_name, conn, &tag, table_flag, table_create_content, None, indexes, primary_keys, update_time_field).await
}

async fn do_init_table(
Expand All @@ -331,6 +350,7 @@ pub mod common_pg {
tag: &str,
table_flag: &str,
table_create_content: &str,
table_inherits: Option<String>,
// field_name_or_fun -> index type
indexes: Vec<(&str, &str)>,
primary_keys: Option<Vec<&str>>,
Expand All @@ -341,7 +361,12 @@ pub mod common_pg {
r#"CREATE TABLE {schema_name}.{GLOBAL_STORAGE_FLAG}_{table_flag}{tag}
(
{table_create_content}
)"#
){}"#,
if let Some(inherits) = table_inherits {
format!(" INHERITS ({inherits})")
} else {
"".to_string()
}
),
vec![],
)
Expand Down
3 changes: 2 additions & 1 deletion backend/gateways/spacegate-plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ spacegate-shell = { workspace = true, features = [
"k8s",
"ext-redis",
"ext-axum",
"plugin-east-west-traffic-white-list",
] }

bios-sdk-invoke = { path = "../../../frontend/sdks/invoke", features = [
"spi_log",
] }
], default-features = false }


jsonpath-rust = "0.3.1"
Expand Down
9 changes: 6 additions & 3 deletions backend/gateways/spacegate-plugins/src/plugin/audit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,21 @@ impl AuditLogPlugin {
if !self.log_url.is_empty() && !self.spi_app_id.is_empty() {
tokio::task::spawn(async move {
match spi_log_client::SpiLogClient::add(
&LogItemAddReq {
LogItemAddReq {
tag,
content: TardisFuns::json.obj_to_string(&content).unwrap_or_default(),
content: TardisFuns::json.obj_to_json(&content).unwrap_or_default(),
kind: None,
ext: Some(content.to_value()),
key: None,
op: Some(content.op),
rel_key: None,
id: None,
idempotent_id: None,
ts: Some(tardis::chrono::Utc::now()),
owner: content.user_id,
own_paths: None,
msg: None,
owner_name: None,
push: false,
},
&funs,
&spi_ctx,
Expand Down
112 changes: 72 additions & 40 deletions backend/gateways/spacegate-plugins/src/plugin/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,33 @@ use bios_auth::{
serv::{auth_crypto_serv, auth_kernel_serv, auth_res_serv},
};

use http::header::HOST;
use serde::{Deserialize, Serialize};
use spacegate_shell::{
hyper::{
self, header,
http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
Method, Request, Response,
},
kernel::{extension::Reflect, helper_layers::function::Inner, SgRequest},
kernel::{
extension::{IsEastWestTraffic, Reflect},
helper_layers::function::Inner,
SgRequest,
},
plugin::{Plugin, PluginConfig, PluginError},
BoxError, SgBody, SgRequestExt,
};
use std::{
collections::HashMap,
ops::Deref,
str::FromStr,
sync::{Arc, Once, OnceLock},
};
use tardis::{
basic::{error::TardisError, result::TardisResult},
cache::AsyncCommands as _,
config::config_dto::CacheModuleConfig,
log::{self, warn},
tracing::{self as tracing, instrument, warn},
serde_json::{self, json},
tokio::{sync::RwLock, task::JoinHandle},
url::Url,
Expand Down Expand Up @@ -69,7 +75,7 @@ impl SgPluginAuthConfig {
let mut instance = INSTANCE.get_or_init(Default::default).write().await;
if let Some((md5, handle)) = instance.as_ref() {
if config_md5.eq(md5) {
log::trace!("[SG.Filter.Auth] have not found config change");
tracing::trace!("[SG.Filter.Auth] have not found config change");
return Ok(());
} else {
handle.abort();
Expand Down Expand Up @@ -98,7 +104,7 @@ impl SgPluginAuthConfig {
tardis::TardisFuns::hot_reload(tardis_config).await?;
let handle = auth_initializer::init_without_webserver().await?;
*instance = Some((config_md5, handle));
log::info!("[SG.Filter.Auth] init done");
tracing::info!("[SG.Filter.Auth] init done");
Ok(())
}
}
Expand Down Expand Up @@ -136,10 +142,10 @@ pub struct AuthPlugin {
///
/// e.g
///
/// |request mix url|replace_url| result |
/// |---------------|-----------|---------------------|
/// | `/apis` | `apis` | `/{true_url}` |
/// |`/prefix/apis` | `apis` |`/prefix/{true_url}` |
/// |request mix url|mix_replace_url| result |
/// |---------------|---------------|---------------------|
/// | `/apis` | `apis` | `/{true_url}` |
/// |`/prefix/apis` | `apis` |`/prefix/{true_url}` |
mix_replace_url: String,
/// Remove prefix of AuthReq path.
/// use for [ctx_to_auth_req]
Expand Down Expand Up @@ -240,9 +246,9 @@ impl AuthPlugin {
return Ok(req);
}

log::trace!("[SG.Filter.Auth] request filter info: request path is {}", req.uri().path());
tracing::trace!("[SG.Filter.Auth] request filter info: request url is {}", req.uri());
if method == http::Method::GET && req.uri().path().trim_matches('/') == self.fetch_server_config_path.as_str().trim_matches('/') {
log::debug!("[SG.Filter.Auth] request path hit fetch server config path: {}", self.fetch_server_config_path);
tracing::debug!("[SG.Filter.Auth] request path hit fetch server config path: {}", self.fetch_server_config_path);
let mock_resp = Response::builder()
.header(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))
.status(http::StatusCode::OK)
Expand All @@ -260,41 +266,42 @@ impl AuthPlugin {

let is_true_mix_req = self.is_mix_req(req.headers());

if self.auth_config.strict_security_mode && !is_true_mix_req {
log::debug!("[SG.Filter.Auth] handle mix request");
return Ok(handle_mix_req(&self.auth_config, &self.mix_replace_url, req).await.map_err(PluginError::internal_error::<AuthPlugin>)?);
let is_east_west_traffic = req.extensions().get::<IsEastWestTraffic>().map(Deref::deref).unwrap_or(&false);
if self.auth_config.strict_security_mode && !is_true_mix_req && !is_east_west_traffic {
tracing::debug!("[SG.Filter.Auth] handle mix request");
return Ok(handle_mix_req(&self, req).await.map_err(PluginError::internal_error::<AuthPlugin>)?);
}
req.headers_mut().append(&self.header_is_mix_req, HeaderValue::from_static("false"));

let (mut auth_req, mut req) = req_to_auth_req(&self.auth_path_ignore_prefix, req).await.map_err(PluginError::internal_error::<AuthPlugin>)?;

match auth_kernel_serv::auth(&mut auth_req, is_true_mix_req).await {
Ok(auth_result) => {
if log::level_enabled!(log::Level::TRACE) {
log::trace!("[SG.Filter.Auth] auth return ok {:?}", auth_result);
} else if log::level_enabled!(log::Level::DEBUG) {
if tracing::level_enabled!(tracing::Level::TRACE) {
tracing::trace!("[SG.Filter.Auth] auth return ok {:?}", auth_result);
} else if tracing::level_enabled!(tracing::Level::DEBUG) {
if let Some(ctx) = &auth_result.ctx {
log::debug!("[SG.Filter.Auth] auth return ok ctx:{ctx}",);
tracing::debug!("[SG.Filter.Auth] auth return ok ctx:{ctx}",);
} else {
log::debug!("[SG.Filter.Auth] auth return ok ctx:None",);
tracing::debug!("[SG.Filter.Auth] auth return ok ctx:None",);
};
}

if auth_result.e.is_none() {
req = success_auth_result_to_req(auth_result, &self.auth_config, req).map_err(PluginError::internal_error::<AuthPlugin>)?;
} else if let Some(e) = auth_result.e {
log::info!("[SG.Filter.Auth] auth failed:{e}");
tracing::info!("[SG.Filter.Auth] auth failed:{e}");
let err_resp = Response::builder()
.header(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))
.status(StatusCode::from_str(&e.code).unwrap_or(StatusCode::BAD_GATEWAY))
.body(SgBody::full(json!({"code":format!("{}-gateways-cert-error",e.code),"message":e.message}).to_string()))
.body(SgBody::full(json!({"code":format!("{}-gateway-cert-error",e.code),"message":e.message}).to_string()))
.map_err(PluginError::internal_error::<AuthPlugin>)?;
return Err(err_resp);
};
Ok(req)
}
Err(e) => {
log::info!("[SG.Filter.Auth] auth return error {:?}", e);
tracing::info!("[SG.Filter.Auth] auth return error {:?}", e);
let err_resp = Response::builder()
.header(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"))
.status(StatusCode::from_str(&e.code).unwrap_or(StatusCode::BAD_GATEWAY))
Expand Down Expand Up @@ -341,35 +348,57 @@ impl AuthPlugin {
}
}

async fn handle_mix_req(auth_config: &AuthConfig, mix_replace_url: &str, req: SgRequest) -> Result<SgRequest, BoxError> {
#[instrument(name="[SG.Filter.Auth.MixReq]",level = "trace", skip_all, fields(req_uri=req.uri().to_string()))]
async fn handle_mix_req(plugin_config: &AuthPlugin, req: SgRequest) -> Result<SgRequest, BoxError> {
let auth_config = &plugin_config.auth_config;
let (mut parts, mut body) = req.into_parts();
if !body.is_dumped() {
body = body.dump().await?;
}
let string_body = String::from_utf8_lossy(body.get_dumped().expect("not expect code")).trim_matches('"').to_string();
if string_body.is_empty() {
return Err("[SG.Filter.Auth.MixReq] body can't be empty".into());
return Err(" body can't be empty".into());
}
let mut req_headers = parts.headers.iter().map(|(k, v)| (k.as_str().to_string(), v.to_str().expect("error parse header value to str").to_string())).collect();
let (body, crypto_headers) = auth_crypto_serv::decrypt_req(&req_headers, &Some(string_body), true, true, auth_config).await?;
req_headers.remove(&auth_config.head_key_crypto);
req_headers.remove(&auth_config.head_key_crypto.to_ascii_lowercase());

let body = body.ok_or_else(|| TardisError::custom("500", "[SG.Filter.Auth.MixReq] decrypt body can't be empty", "500-parse_mix_req-parse-error"))?;
let body = body.ok_or_else(|| TardisError::custom("500", " decrypt body can't be empty", "500-parse_mix_req-parse-error"))?;

let mix_body = TardisFuns::json.str_to_obj::<MixRequestBody>(&body)?;
// ctx.set_action(SgRouteFilterRequestAction::Redirect);
let mut true_uri = Url::from_str(&parts.uri.to_string().replace(mix_replace_url, &mix_body.uri))
.map_err(|e| TardisError::custom("500", &format!("[SG.Filter.Auth.MixReq] url parse err {e}"), "500-parse_mix_req-url-error"))?;
true_uri.set_path(&true_uri.path().replace("//", "/"));
true_uri.set_query(Some(&if let Some(old_query) = true_uri.query() {
format!("{}&_t={}", old_query, mix_body.ts)
} else {
format!("_t={}", mix_body.ts)
}));
parts.uri = true_uri.as_str().parse().map_err(|e| TardisError::custom("500", &format!("[SG.Filter.Auth.MixReq] uri parse error: {}", e), ""))?;
let true_uri = parts.uri.to_string().replace(&plugin_config.mix_replace_url, &mix_body.uri).replace("//", "/");
tracing::trace!(?true_uri," string true uri:");
let mut true_uri_parts =
true_uri.parse::<http::Uri>().map_err(|e| TardisError::custom("500", &format!(" url parse err {e}"), "500-parse_mix_req-url-error"))?.into_parts();

let host = parts.uri.host().map(String::from).or(parts.headers.get(HOST).and_then(|x| x.to_str().map(String::from).ok()));
if let Some(host) = host {
true_uri_parts.authority = Some(http::uri::Authority::from_str(&host).map_err(|e| {
TardisError::custom(
"500",
&format!(" error parse str {host} to authority :{e}"),
"500-parse_mix_req-authority-error",
)
})?);
}
let old_scheme = parts.uri.scheme().cloned().unwrap_or_else(|| {
if let Some(port) = true_uri_parts.authority.clone().and_then(|a| a.port_u16()) {
if port == 443 {
http::uri::Scheme::HTTPS
} else {
http::uri::Scheme::HTTP
}
} else {
http::uri::Scheme::HTTP
}
});
true_uri_parts.scheme = Some(old_scheme);
let true_uri = http::Uri::from_parts(true_uri_parts)?;
tracing::trace!(" raw url:[{}],true url:[{}]", parts.uri.to_string(), true_uri);
parts.uri = true_uri;
parts.method = Method::from_str(&mix_body.method.to_ascii_uppercase())
.map_err(|e| TardisError::custom("500", &format!("[SG.Filter.Auth.MixReq] method parse err {e}"), "500-parse_mix_req-method-error"))?;
.map_err(|e| TardisError::custom("500", &format!(" method parse err {e}"), "500-parse_mix_req-method-error"))?;

let mut headers = req_headers;
headers.extend(mix_body.headers);
Expand All @@ -380,17 +409,20 @@ async fn handle_mix_req(auth_config: &AuthConfig, mix_replace_url: &str, req: Sg
.into_iter()
.map(|(k, v)| {
Ok::<_, TardisError>((
HeaderName::from_str(&k).map_err(|e| TardisError::format_error(&format!("[SG.Filter.Auth] error parse str {k} to header name :{e}"), ""))?,
HeaderValue::from_str(&v).map_err(|e| TardisError::format_error(&format!("[SG.Filter.Auth] error parse str {v} to header value :{e}"), ""))?,
HeaderName::from_str(&k).map_err(|e| TardisError::format_error(&format!(" error parse str {k} to header name :{e}"), ""))?,
HeaderValue::from_str(&v).map_err(|e| TardisError::format_error(&format!(" error parse str {v} to header value :{e}"), ""))?,
))
})
.collect::<TardisResult<HeaderMap<HeaderValue>>>()?,
);
parts.headers.remove(plugin_config.header_is_same_req.clone());
parts.headers.append(plugin_config.header_is_mix_req.clone(), HeaderValue::from_static("true"));

let new_body = SgBody::full(mix_body.body);

// ctx.request.set_header_str(&self.header_is_mix_req, "true")?;
Ok(Request::from_parts(parts, new_body))
let mut new_req = Request::from_parts(parts, new_body);
spacegate_shell::kernel::utils::req_length_or_chunked(&mut new_req);
Ok(new_req)
}

/// # Convert Request to AuthReq
Expand Down Expand Up @@ -496,7 +528,7 @@ async fn resp_to_auth_encrypt_req(resp: Response<SgBody>) -> TardisResult<(AuthE
parts.extensions.insert(BeforeEncryptBody::new(resp_body.clone()));
}
let string_body = String::from_utf8_lossy(resp_body);
log::trace!("[SG.Filter.Auth] Before Encrypt Body {}", string_body);
tracing::trace!("[SG.Filter.Auth] Before Encrypt Body {}", string_body);
Ok((
AuthEncryptReq {
headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn test() {
// let req_body = before_filter_ctx.response.take_body_into_bytes().await.unwrap();
// let req_body = String::from_utf8_lossy(&req_body).to_string();
// assert!(!req_body.is_empty());
// assert_eq!(req_body, "{\"code\":\"401-gateways-cert-error\",\"message\":\"[Auth] Token [aaa] is not legal\"}");
// assert_eq!(req_body, "{\"code\":\"401-gateway-cert-error\",\"message\":\"[Auth] Token [aaa] is not legal\"}");

// cache_client.set(&format!("{}tokenxxx", filter_auth.auth_config.cache_key_token_info), "default,accountxxx").await.unwrap();
// cache_client
Expand Down
Loading

0 comments on commit 577f440

Please sign in to comment.