diff --git a/backend/middlewares/event/src/event_config.rs b/backend/middlewares/event/src/event_config.rs index 98e88ab4..d8457ef8 100644 --- a/backend/middlewares/event/src/event_config.rs +++ b/backend/middlewares/event/src/event_config.rs @@ -3,6 +3,7 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Mutex}; use tardis::basic::{error::TardisError, result::TardisResult}; +use bios_sdk_invoke::clients::event_client::{EventCenterConfig}; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(default)] pub struct EventConfig { @@ -10,6 +11,7 @@ pub struct EventConfig { pub enable: bool, pub event_url: String, pub event_bus_sk: String, + pub avatars: Vec, pub resend_threshold: u32, pub resend_interval_sec: Option, } @@ -19,6 +21,7 @@ impl Default for EventConfig { EventConfig { rbum: Default::default(), enable: false, + avatars: Vec::new(), event_url: "".to_string(), event_bus_sk: "".to_string(), resend_threshold: 3, diff --git a/backend/middlewares/event/src/event_initializer.rs b/backend/middlewares/event/src/event_initializer.rs index 64f19f35..e0b5630f 100644 --- a/backend/middlewares/event/src/event_initializer.rs +++ b/backend/middlewares/event/src/event_initializer.rs @@ -3,7 +3,7 @@ use bios_basic::rbum::{ rbum_enumeration::RbumScopeLevelKind, serv::{rbum_crud_serv::RbumCrudOperation, rbum_domain_serv::RbumDomainServ, rbum_item_serv::RbumItemCrudOperation, rbum_kind_serv::RbumKindServ}, }; -use bios_sdk_invoke::clients::event_client::{BiosEventCenter, EventCenter, TOPIC_EVENT_BUS}; +use bios_sdk_invoke::clients::event_client::{BiosEventCenter, EventCenter, EventCenterConfig, TOPIC_BIOS_PUB_SUB, TOPIC_BIOS_WORKER_QUEUE}; use tardis::{ basic::{dto::TardisContext, field::TrimString, result::TardisResult}, db::reldb_client::TardisActiveModel, @@ -93,11 +93,26 @@ async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst, .await?; EventInfoManager::set(EventInfo { kind_id, domain_id })?; let config = funs.conf::(); - // create event bus topic + // create bios worker queue topic serv::event_topic_serv::EventDefServ::add_item( &mut EventTopicAddOrModifyReq { - code: TOPIC_EVENT_BUS.into(), - name: TOPIC_EVENT_BUS.into(), + code: TOPIC_BIOS_WORKER_QUEUE.into(), + name: TOPIC_BIOS_WORKER_QUEUE.into(), + save_message: false, + need_mgr: false, + queue_size: 1024, + use_sk: Some(config.event_bus_sk.clone()), + mgr_sk: None, + }, + funs, + ctx, + ) + .await?; + // create bios pub sub topic + serv::event_topic_serv::EventDefServ::add_item( + &mut EventTopicAddOrModifyReq { + code: TOPIC_BIOS_PUB_SUB.into(), + name: TOPIC_BIOS_PUB_SUB.into(), save_message: false, need_mgr: false, queue_size: 1024, @@ -149,8 +164,27 @@ fn init_scan_and_resend_task() { } fn create_event_center() -> TardisResult<()> { - let bios_event_center = BiosEventCenter::from_domain(DOMAIN_CODE); - bios_event_center.init()?; - bios_event_center.set_event_bus(); + let config = TardisFuns::cs_config::(DOMAIN_CODE); + let pubsub_config = EventCenterConfig { + base_url: config.event_url.clone(), + topic_sk: config.event_bus_sk.clone(), + topic_code: TOPIC_BIOS_PUB_SUB.to_owned(), + subscribe: true, + avatars: config.avatars.clone(), + }; + let pubsub = BiosEventCenter::from_config(pubsub_config); + pubsub.init()?; + pubsub.set_as_worker_queue(); + + let wq_config = EventCenterConfig { + base_url: config.event_url.clone(), + topic_sk: config.event_bus_sk.clone(), + topic_code: TOPIC_BIOS_WORKER_QUEUE.to_owned(), + subscribe: false, + avatars: config.avatars.clone(), + }; + let wq = BiosEventCenter::from_config(wq_config); + wq.init()?; + wq.set_as_worker_queue(); Ok(()) } diff --git a/backend/middlewares/flow/src/serv/flow_event_serv.rs b/backend/middlewares/flow/src/serv/flow_event_serv.rs index 4147159a..9ebfa4e6 100644 --- a/backend/middlewares/flow/src/serv/flow_event_serv.rs +++ b/backend/middlewares/flow/src/serv/flow_event_serv.rs @@ -369,7 +369,7 @@ impl FlowEventServ { funs, ) .await?; - if let Some(event_center) = BiosEventCenter::event_bus() { + if let Some(event_center) = BiosEventCenter::worker_queue() { event_center .publish( FlowFrontChangeReq { diff --git a/backend/middlewares/schedule/src/schedule_constants.rs b/backend/middlewares/schedule/src/schedule_constants.rs index f57cfa08..abcc3c7f 100644 --- a/backend/middlewares/schedule/src/schedule_constants.rs +++ b/backend/middlewares/schedule/src/schedule_constants.rs @@ -1,2 +1,8 @@ pub const DOMAIN_CODE: &str = "schedule"; pub const KV_KEY_CODE: &str = "schedule:job:"; +pub const JOB_TAG: &str = "schedule_job"; +pub const TASK_TAG: &str = "schedule_task"; +pub const OP_ADD: &str = "add"; +pub const OP_DELETE: &str = "delete"; +pub const OP_EXECUTE_START: &str = "exec-start"; +pub const OP_EXECUTE_END: &str = "exec-end"; \ No newline at end of file diff --git a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event.rs b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event.rs index 061b53dc..5d54887d 100644 --- a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event.rs +++ b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event.rs @@ -1,5 +1,6 @@ use std::sync::Arc; mod spi_log; +mod schedule_event; pub use spi_log::*; use tardis::{basic::dto::TardisContext, futures::Stream, serde_json::Value, TardisFunsInst}; diff --git a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/schedule_event.rs b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/schedule_event.rs new file mode 100644 index 00000000..e7ecc198 --- /dev/null +++ b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/schedule_event.rs @@ -0,0 +1,165 @@ +use std::sync::Arc; + +use bios_sdk_invoke::clients::{ + event_client::{BiosEventCenter, Event, EventCenter, EventExt}, + spi_log_client::LogItemAddReq, +}; +use serde::{Deserialize, Serialize}; +use tardis::{ + basic::{dto::TardisContext, error::TardisError, result::TardisResult}, + chrono::Utc, + futures::Stream, + log::error, + tokio, TardisFunsInst, +}; + +use crate::schedule_constants::*; + +use super::{EventComponent, ScheduleEvent}; +#[derive(Clone)] +pub struct ScheduleEventCenter { + funs: Arc, + ctx: Arc, +} + +const SCHEDULE_AVATAR: &str = "schedule"; +#[derive(Serialize, Deserialize)] +pub struct AddTaskEvent { + pub code: String, +} +#[derive(Serialize, Deserialize)] +pub struct DeleteTaskEvent { + pub code: String, +} +impl Event for AddTaskEvent { + const CODE: &'static str = "schedule/add_task"; +} +impl Event for DeleteTaskEvent { + const CODE: &'static str = "schedule/delete_task"; +} + +impl EventComponent for ScheduleEventCenter { + fn from_context(funs: impl Into>, ctx: impl Into>) -> Self { + Self { + funs: funs.into(), + ctx: ctx.into(), + } + } + + fn notify_create(&self, code: &str) { + if let Some(ec) = BiosEventCenter::pub_sub() { + let code = code.to_owned(); + tokio::spawn(async move { + let _ = ec.publish(AddTaskEvent { code }.with_source(SCHEDULE_AVATAR)).await; + }); + } + } + + fn notify_delete(&self, code: &str) { + if let Some(ec) = BiosEventCenter::pub_sub() { + let code = code.to_owned(); + tokio::spawn(async move { + let _ = ec.publish(DeleteTaskEvent { code }.with_source(SCHEDULE_AVATAR)).await; + }); + } + } + + fn notify_execute_start(&self, code: &str) { + if let Some(ec) = BiosEventCenter::worker_queue() { + let funs = self.funs.clone(); + let ctx = self.ctx.clone(); + let code = code.to_string(); + let _handle = tokio::spawn(async move { + let result = ec + .publish( + LogItemAddReq { + tag: TASK_TAG.to_string(), + content: "start request".into(), + key: Some(code.to_string()), + op: Some(OP_EXECUTE_START.to_string()), + ts: Some(Utc::now()), + ..Default::default() + } + .inject_context(&funs, &ctx), + ) + .await; + if let Err(e) = result { + error!("notify_create error: {:?}", e); + } + }); + } + } + + fn notify_execute_end(&self, code: &str, message: String, ext: tardis::serde_json::Value) { + if let Some(ec) = BiosEventCenter::worker_queue() { + let funs = self.funs.clone(); + let ctx = self.ctx.clone(); + let code = code.to_string(); + let _handle = tokio::spawn(async move { + let result = ec + .publish( + LogItemAddReq { + tag: TASK_TAG.to_string(), + content: message, + ext: Some(ext), + key: Some(code.to_string()), + op: Some(OP_EXECUTE_END.to_string()), + ts: Some(Utc::now()), + ..Default::default() + } + .inject_context(&funs, &ctx), + ) + .await; + if let Err(e) = result { + error!("notify_create error: {:?}", e); + } + }); + } + } + + fn create_event_stream() -> impl tardis::futures::Stream + Send { + EventCenterBasedEventStream::new() + } +} + +pub struct EventCenterBasedEventStream { + event_rx: tokio::sync::mpsc::Receiver, +} + +impl EventCenterBasedEventStream { + pub fn new() -> Self { + let (event_tx, event_rx) = tokio::sync::mpsc::channel(100); + if let Some(ec) = BiosEventCenter::pub_sub() { + { + let event_tx = event_tx.clone(); + ec.subscribe(move |AddTaskEvent { code }: AddTaskEvent| { + let event_tx = event_tx.clone(); + async move { + event_tx.send(ScheduleEvent::JustCreate { code }).await.map_err(|_| TardisError::internal_error("fail to send out event", ""))?; + TardisResult::Ok(()) + } + }); + } + { + let event_tx = event_tx.clone(); + ec.subscribe(move |DeleteTaskEvent { code }: DeleteTaskEvent| { + let event_tx = event_tx.clone(); + async move { + event_tx.send(ScheduleEvent::JustCreate { code }).await.map_err(|_| TardisError::internal_error("fail to send out event", ""))?; + TardisResult::Ok(()) + } + }); + } + } + EventCenterBasedEventStream { event_rx } + } +} + +impl Stream for EventCenterBasedEventStream { + type Item = super::ScheduleEvent; + + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let mut this = std::pin::pin!(self); + this.event_rx.poll_recv(cx) + } +} diff --git a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/spi_log.rs b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/spi_log.rs index 17ae3a15..420d1e0b 100644 --- a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/spi_log.rs +++ b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2/event/spi_log.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, task::ready, time::Duration}; -use bios_sdk_invoke::clients::spi_log_client::{LogItemFindReq, SpiLogClient}; +use bios_sdk_invoke::clients::spi_log_client::{LogItemAddReq, LogItemFindReq, SpiLogClient}; use tardis::{ basic::dto::TardisContext, chrono::Utc, @@ -9,15 +9,10 @@ use tardis::{ tokio, TardisFuns, TardisFunsInst, }; -use crate::schedule_constants::DOMAIN_CODE; +use crate::schedule_constants::*; use super::EventComponent; -const JOB_TAG: &str = "schedule_job"; -const TASK_TAG: &str = "schedule_task"; -const OP_ADD: &str = "add"; -const OP_DELETE: &str = "delete"; -const OP_EXECUTE_START: &str = "exec-start"; -const OP_EXECUTE_END: &str = "exec-end"; + #[derive(Clone)] pub struct SpiLog { @@ -40,17 +35,15 @@ impl EventComponent for SpiLog { let ctx = self.ctx.clone(); let code = code.to_string(); let _handle = tokio::spawn(async move { - let result = SpiLogClient::add_with_many_params( - JOB_TAG, - "add job", - None, - None, - Some(code.to_string()), - Some(OP_ADD.to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, + let result = SpiLogClient::add( + &LogItemAddReq { + tag: JOB_TAG.to_string(), + content: "add job".into(), + key: Some(code.to_string()), + op: Some(OP_ADD.to_string()), + ts: Some(Utc::now()), + ..Default::default() + }, &funs, &ctx, ) @@ -67,17 +60,15 @@ impl EventComponent for SpiLog { let ctx = self.ctx.clone(); let code = code.to_string(); let _handle = tokio::spawn(async move { - let result = SpiLogClient::add_with_many_params( - JOB_TAG, - "delete job", - None, - None, - Some(code.to_string()), - Some(OP_DELETE.to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, + let result = SpiLogClient::add( + &LogItemAddReq { + tag: JOB_TAG.to_string(), + content: "delete job".into(), + key: Some(code.to_string()), + op: Some(OP_DELETE.to_string()), + ts: Some(Utc::now()), + ..Default::default() + }, &funs, &ctx, ) @@ -94,17 +85,15 @@ impl EventComponent for SpiLog { let ctx = self.ctx.clone(); let code = code.to_string(); let _handle = tokio::spawn(async move { - let result = SpiLogClient::add_with_many_params( - TASK_TAG, - "start request", - None, - None, - Some(code.to_string()), - Some(OP_EXECUTE_START.to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, + let result = SpiLogClient::add( + &LogItemAddReq { + tag: TASK_TAG.to_string(), + content: "start request".into(), + key: Some(code.to_string()), + op: Some(OP_EXECUTE_START.to_string()), + ts: Some(Utc::now()), + ..Default::default() + }, &funs, &ctx, ) @@ -121,21 +110,21 @@ impl EventComponent for SpiLog { let ctx = self.ctx.clone(); let code = code.to_string(); let _handle = tokio::spawn(async move { - let result = SpiLogClient::add_with_many_params( - TASK_TAG, - &message, - Some(ext), - None, - Some(code.to_string()), - Some(OP_EXECUTE_END.to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, + let result = SpiLogClient::add( + &LogItemAddReq { + tag: TASK_TAG.to_string(), + content: message, + ext: Some(ext), + key: Some(code.to_string()), + op: Some(OP_EXECUTE_END.to_string()), + ts: Some(Utc::now()), + ..Default::default() + }, &funs, &ctx, ) .await; + if let Err(e) = result { error!("notify_create error: {:?}", e); } diff --git a/backend/spi/spi-kv/src/event.rs b/backend/spi/spi-kv/src/event.rs index 655e2293..a76a8434 100644 --- a/backend/spi/spi-kv/src/event.rs +++ b/backend/spi/spi-kv/src/event.rs @@ -23,7 +23,7 @@ async fn handle_kv_delete_event(req: KvItemDeleteReq, ctx: TardisContext) -> Tar } pub fn register_kv_events() { - if let Some(bios_event_center) = BiosEventCenter::event_bus() { + if let Some(bios_event_center) = BiosEventCenter::worker_queue() { bios_event_center.subscribe(handle_kv_add_event); bios_event_center.subscribe(handle_kv_delete_event); } diff --git a/backend/spi/spi-log/src/event.rs b/backend/spi/spi-log/src/event.rs index 79095c64..433e7920 100644 --- a/backend/spi/spi-log/src/event.rs +++ b/backend/spi/spi-log/src/event.rs @@ -18,7 +18,7 @@ async fn handle_add_event(req: LogItemAddReq, ctx: TardisContext) -> TardisResul } pub fn register_log_event() { - if let Some(bios_event_center) = BiosEventCenter::event_bus() { + if let Some(bios_event_center) = BiosEventCenter::worker_queue() { bios_event_center.subscribe(handle_add_event); } } diff --git a/backend/spi/spi-search/src/event.rs b/backend/spi/spi-search/src/event.rs index a5f58c2e..c703fa89 100644 --- a/backend/spi/spi-search/src/event.rs +++ b/backend/spi/spi-search/src/event.rs @@ -26,7 +26,7 @@ async fn handle_delete_event(req: SearchEventItemDeleteReq, ctx: TardisContext) } pub(crate) fn register_search_events() { - if let Some(bios_event_center) = BiosEventCenter::event_bus() { + if let Some(bios_event_center) = BiosEventCenter::worker_queue() { bios_event_center.subscribe(handle_modify_event); bios_event_center.subscribe(handle_add_event); bios_event_center.subscribe(handle_delete_event); diff --git a/backend/supports/iam/src/basic/serv/clients/iam_log_client.rs b/backend/supports/iam/src/basic/serv/clients/iam_log_client.rs index a4e45954..06084390 100644 --- a/backend/supports/iam/src/basic/serv/clients/iam_log_client.rs +++ b/backend/supports/iam/src/basic/serv/clients/iam_log_client.rs @@ -150,7 +150,7 @@ impl IamLogClient { owner, own_paths, }; - if let Some(ws_client) = BiosEventCenter::event_bus() { + if let Some(ws_client) = BiosEventCenter::worker_queue() { ws_client.publish(add_req.with_source(IAM_AVATAR).inject_context(funs, ctx)).await?; } else { SpiLogClient::add(&add_req, funs, ctx).await?; diff --git a/backend/supports/iam/src/basic/serv/clients/iam_search_client.rs b/backend/supports/iam/src/basic/serv/clients/iam_search_client.rs index b05ca25f..b377a9ca 100644 --- a/backend/supports/iam/src/basic/serv/clients/iam_search_client.rs +++ b/backend/supports/iam/src/basic/serv/clients/iam_search_client.rs @@ -181,7 +181,7 @@ impl IamSearchClient { groups: Some(account_resp_dept_id), }), }; - if let Some(event_center) = BiosEventCenter::event_bus() { + if let Some(event_center) = BiosEventCenter::worker_queue() { event_center.modify_item_and_name(IAM_AVATAR, &tag, &key, &modify_req, funs, ctx).await?; } else { SpiSearchClient::modify_item_and_name(&tag, &key, &modify_req, funs, ctx).await?; @@ -226,7 +226,7 @@ impl IamSearchClient { groups: Some(account_resp_dept_id), }), }; - if let Some(event_center) = BiosEventCenter::event_bus() { + if let Some(event_center) = BiosEventCenter::worker_queue() { event_center.add_item_and_name(IAM_AVATAR, &add_req, Some(account_resp.name), funs, ctx).await?; } else { SpiSearchClient::add_item_and_name(&add_req, Some(account_resp.name), funs, ctx).await?; @@ -238,7 +238,7 @@ impl IamSearchClient { // account 全局搜索删除埋点方法 pub async fn delete_account_search(account_id: &str, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> { let tag = funs.conf::().spi.search_account_tag.clone(); - if let Some(event_center) = BiosEventCenter::event_bus() { + if let Some(event_center) = BiosEventCenter::worker_queue() { event_center.delete_item_and_name(IAM_AVATAR, &tag, account_id, funs, ctx).await?; } else { SpiSearchClient::delete_item_and_name(&tag, account_id, funs, ctx).await?; diff --git a/frontend/sdks/invoke/src/clients/event_client.rs b/frontend/sdks/invoke/src/clients/event_client.rs index 34d4a0b3..1d5826d7 100644 --- a/frontend/sdks/invoke/src/clients/event_client.rs +++ b/frontend/sdks/invoke/src/clients/event_client.rs @@ -91,7 +91,8 @@ pub struct EventListenerRegisterResp { ******************************************************************************************************************/ // GLOBAL EVENT BUS -pub const TOPIC_EVENT_BUS: &str = "event_bus"; +pub const TOPIC_BIOS_WORKER_QUEUE: &str = "bios/worker-queue"; +pub const TOPIC_BIOS_PUB_SUB: &str = "bios/pub-sub"; pub const TOPIC_PUBLIC: &str = "public"; #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -119,13 +120,32 @@ impl From for EventListenerRegisterReq { } #[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(default)] pub struct EventCenterConfig { - base_url: String, - topic_sk: String, - topic_code: String, - avatars: Vec, + pub base_url: String, + pub topic_sk: String, + pub topic_code: String, + /// The phrase "subscribe" here is **extremely** bad. + /// + /// The difference between subscribe and not subscribe is actually + /// the difference between a pub/sub system and a worker queue. + /// + /// We may need to change this to a more meaningful name. + pub subscribe: bool, + pub avatars: Vec, } +impl Default for EventCenterConfig { + fn default() -> Self { + Self { + base_url: "http://localhost:8080".to_string(), + topic_sk: String::default(), + topic_code: TOPIC_BIOS_WORKER_QUEUE.to_string(), + subscribe: false, + avatars: vec![], + } + } +} type WsEventCenterHandler = dyn Fn(serde_json::Value) -> Pin> + Send>> + Send + Sync; type WsHandlersMap = HashMap<&'static str, Vec>>; @@ -183,11 +203,11 @@ impl EventCenter for WsEventCenter { let client = EventClient::new(url, &funs); let resp = client .register(&EventListenerRegisterReq { - topic_code: TOPIC_EVENT_BUS.to_string(), + topic_code: config.topic_code.to_string(), topic_sk: Some(config.topic_sk.clone()), events: Some(events), avatars: config.avatars.clone(), - subscribe_mode: false, + subscribe_mode: config.subscribe, }) .await .expect("fail to register event center"); @@ -319,12 +339,20 @@ impl BiosEventCenter { TardisFuns::store().get_singleton::() } #[inline(always)] - pub fn event_bus() -> Option { - TardisFuns::store().get(TOPIC_EVENT_BUS.as_bytes()) + pub fn worker_queue() -> Option { + TardisFuns::store().get(TOPIC_BIOS_WORKER_QUEUE.as_bytes()) + } + #[inline(always)] + pub fn set_as_worker_queue(&self) { + TardisFuns::store().insert(ComponentKey::named(TOPIC_BIOS_WORKER_QUEUE.as_bytes()), self.clone()); + } + #[inline(always)] + pub fn pub_sub() -> Option { + TardisFuns::store().get(TOPIC_BIOS_PUB_SUB.as_bytes()) } #[inline(always)] - pub fn set_event_bus(self) { - TardisFuns::store().insert(ComponentKey::named(TOPIC_EVENT_BUS.as_bytes()), self); + pub fn set_pub_sub(self) { + TardisFuns::store().insert(ComponentKey::named(TOPIC_BIOS_PUB_SUB.as_bytes()), self); } #[inline(always)] pub fn public() -> Option { diff --git a/frontend/sdks/invoke/src/clients/spi_log_client.rs b/frontend/sdks/invoke/src/clients/spi_log_client.rs index 71179386..6ab20813 100644 --- a/frontend/sdks/invoke/src/clients/spi_log_client.rs +++ b/frontend/sdks/invoke/src/clients/spi_log_client.rs @@ -61,7 +61,7 @@ pub struct LogDynamicContentReq { pub content: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct LogItemAddReq { pub tag: String, pub content: String,