Skip to content

Commit

Permalink
schedule using event
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Jul 18, 2024
1 parent 28f3fe5 commit d5f30a6
Show file tree
Hide file tree
Showing 14 changed files with 305 additions and 79 deletions.
3 changes: 3 additions & 0 deletions backend/middlewares/event/src/event_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Mutex};
use tardis::basic::{error::TardisError, result::TardisResult};
use bios_sdk_invoke::clients::event_client::{EventCenterConfig};
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct EventConfig {
pub rbum: RbumConfig,
pub enable: bool,
pub event_url: String,
pub event_bus_sk: String,
pub avatars: Vec<String>,
pub resend_threshold: u32,
pub resend_interval_sec: Option<u32>,
}
Expand All @@ -19,6 +21,7 @@ impl Default for EventConfig {
EventConfig {
rbum: Default::default(),
enable: false,
avatars: Vec::new(),
event_url: "".to_string(),
event_bus_sk: "".to_string(),
resend_threshold: 3,
Expand Down
48 changes: 41 additions & 7 deletions backend/middlewares/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use bios_basic::rbum::{
rbum_enumeration::RbumScopeLevelKind,
serv::{rbum_crud_serv::RbumCrudOperation, rbum_domain_serv::RbumDomainServ, rbum_item_serv::RbumItemCrudOperation, rbum_kind_serv::RbumKindServ},
};
use bios_sdk_invoke::clients::event_client::{BiosEventCenter, EventCenter, TOPIC_EVENT_BUS};
use bios_sdk_invoke::clients::event_client::{BiosEventCenter, EventCenter, EventCenterConfig, TOPIC_BIOS_PUB_SUB, TOPIC_BIOS_WORKER_QUEUE};
use tardis::{
basic::{dto::TardisContext, field::TrimString, result::TardisResult},
db::reldb_client::TardisActiveModel,
Expand Down Expand Up @@ -93,11 +93,26 @@ async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst,
.await?;
EventInfoManager::set(EventInfo { kind_id, domain_id })?;
let config = funs.conf::<EventConfig>();
// create event bus topic
// create bios worker queue topic
serv::event_topic_serv::EventDefServ::add_item(
&mut EventTopicAddOrModifyReq {
code: TOPIC_EVENT_BUS.into(),
name: TOPIC_EVENT_BUS.into(),
code: TOPIC_BIOS_WORKER_QUEUE.into(),
name: TOPIC_BIOS_WORKER_QUEUE.into(),
save_message: false,
need_mgr: false,
queue_size: 1024,
use_sk: Some(config.event_bus_sk.clone()),
mgr_sk: None,
},
funs,
ctx,
)
.await?;
// create bios pub sub topic
serv::event_topic_serv::EventDefServ::add_item(
&mut EventTopicAddOrModifyReq {
code: TOPIC_BIOS_PUB_SUB.into(),
name: TOPIC_BIOS_PUB_SUB.into(),
save_message: false,
need_mgr: false,
queue_size: 1024,
Expand Down Expand Up @@ -149,8 +164,27 @@ fn init_scan_and_resend_task() {
}

fn create_event_center() -> TardisResult<()> {
let bios_event_center = BiosEventCenter::from_domain(DOMAIN_CODE);
bios_event_center.init()?;
bios_event_center.set_event_bus();
let config = TardisFuns::cs_config::<EventConfig>(DOMAIN_CODE);
let pubsub_config = EventCenterConfig {
base_url: config.event_url.clone(),
topic_sk: config.event_bus_sk.clone(),
topic_code: TOPIC_BIOS_PUB_SUB.to_owned(),
subscribe: true,
avatars: config.avatars.clone(),
};
let pubsub = BiosEventCenter::from_config(pubsub_config);
pubsub.init()?;
pubsub.set_as_worker_queue();

let wq_config = EventCenterConfig {
base_url: config.event_url.clone(),
topic_sk: config.event_bus_sk.clone(),
topic_code: TOPIC_BIOS_WORKER_QUEUE.to_owned(),
subscribe: false,
avatars: config.avatars.clone(),
};
let wq = BiosEventCenter::from_config(wq_config);
wq.init()?;
wq.set_as_worker_queue();
Ok(())
}
2 changes: 1 addition & 1 deletion backend/middlewares/flow/src/serv/flow_event_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl FlowEventServ {
funs,
)
.await?;
if let Some(event_center) = BiosEventCenter::event_bus() {
if let Some(event_center) = BiosEventCenter::worker_queue() {
event_center
.publish(
FlowFrontChangeReq {
Expand Down
6 changes: 6 additions & 0 deletions backend/middlewares/schedule/src/schedule_constants.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
pub const DOMAIN_CODE: &str = "schedule";
pub const KV_KEY_CODE: &str = "schedule:job:";
pub const JOB_TAG: &str = "schedule_job";
pub const TASK_TAG: &str = "schedule_task";
pub const OP_ADD: &str = "add";
pub const OP_DELETE: &str = "delete";
pub const OP_EXECUTE_START: &str = "exec-start";
pub const OP_EXECUTE_END: &str = "exec-end";
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;
mod spi_log;
mod schedule_event;
pub use spi_log::*;
use tardis::{basic::dto::TardisContext, futures::Stream, serde_json::Value, TardisFunsInst};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::sync::Arc;

use bios_sdk_invoke::clients::{
event_client::{BiosEventCenter, Event, EventCenter, EventExt},
spi_log_client::LogItemAddReq,
};
use serde::{Deserialize, Serialize};
use tardis::{
basic::{dto::TardisContext, error::TardisError, result::TardisResult},
chrono::Utc,
futures::Stream,
log::error,
tokio, TardisFunsInst,
};

use crate::schedule_constants::*;

use super::{EventComponent, ScheduleEvent};
#[derive(Clone)]
pub struct ScheduleEventCenter {
funs: Arc<TardisFunsInst>,
ctx: Arc<TardisContext>,
}

const SCHEDULE_AVATAR: &str = "schedule";
#[derive(Serialize, Deserialize)]
pub struct AddTaskEvent {
pub code: String,
}
#[derive(Serialize, Deserialize)]
pub struct DeleteTaskEvent {
pub code: String,
}
impl Event for AddTaskEvent {
const CODE: &'static str = "schedule/add_task";
}
impl Event for DeleteTaskEvent {
const CODE: &'static str = "schedule/delete_task";
}

impl EventComponent for ScheduleEventCenter {
fn from_context(funs: impl Into<std::sync::Arc<tardis::TardisFunsInst>>, ctx: impl Into<std::sync::Arc<tardis::basic::dto::TardisContext>>) -> Self {
Self {
funs: funs.into(),
ctx: ctx.into(),
}
}

fn notify_create(&self, code: &str) {
if let Some(ec) = BiosEventCenter::pub_sub() {
let code = code.to_owned();
tokio::spawn(async move {
let _ = ec.publish(AddTaskEvent { code }.with_source(SCHEDULE_AVATAR)).await;
});
}
}

fn notify_delete(&self, code: &str) {
if let Some(ec) = BiosEventCenter::pub_sub() {
let code = code.to_owned();
tokio::spawn(async move {
let _ = ec.publish(DeleteTaskEvent { code }.with_source(SCHEDULE_AVATAR)).await;
});
}
}

fn notify_execute_start(&self, code: &str) {
if let Some(ec) = BiosEventCenter::worker_queue() {
let funs = self.funs.clone();
let ctx = self.ctx.clone();
let code = code.to_string();
let _handle = tokio::spawn(async move {
let result = ec
.publish(
LogItemAddReq {
tag: TASK_TAG.to_string(),
content: "start request".into(),
key: Some(code.to_string()),
op: Some(OP_EXECUTE_START.to_string()),
ts: Some(Utc::now()),
..Default::default()
}
.inject_context(&funs, &ctx),
)
.await;
if let Err(e) = result {
error!("notify_create error: {:?}", e);
}
});
}
}

fn notify_execute_end(&self, code: &str, message: String, ext: tardis::serde_json::Value) {
if let Some(ec) = BiosEventCenter::worker_queue() {
let funs = self.funs.clone();
let ctx = self.ctx.clone();
let code = code.to_string();
let _handle = tokio::spawn(async move {
let result = ec
.publish(
LogItemAddReq {
tag: TASK_TAG.to_string(),
content: message,
ext: Some(ext),
key: Some(code.to_string()),
op: Some(OP_EXECUTE_END.to_string()),
ts: Some(Utc::now()),
..Default::default()
}
.inject_context(&funs, &ctx),
)
.await;
if let Err(e) = result {
error!("notify_create error: {:?}", e);
}
});
}
}

fn create_event_stream() -> impl tardis::futures::Stream<Item = super::ScheduleEvent> + Send {
EventCenterBasedEventStream::new()
}
}

pub struct EventCenterBasedEventStream {
event_rx: tokio::sync::mpsc::Receiver<super::ScheduleEvent>,
}

impl EventCenterBasedEventStream {
pub fn new() -> Self {
let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
if let Some(ec) = BiosEventCenter::pub_sub() {
{
let event_tx = event_tx.clone();
ec.subscribe(move |AddTaskEvent { code }: AddTaskEvent| {
let event_tx = event_tx.clone();
async move {
event_tx.send(ScheduleEvent::JustCreate { code }).await.map_err(|_| TardisError::internal_error("fail to send out event", ""))?;
TardisResult::Ok(())
}
});
}
{
let event_tx = event_tx.clone();
ec.subscribe(move |DeleteTaskEvent { code }: DeleteTaskEvent| {
let event_tx = event_tx.clone();
async move {
event_tx.send(ScheduleEvent::JustCreate { code }).await.map_err(|_| TardisError::internal_error("fail to send out event", ""))?;
TardisResult::Ok(())
}
});
}
}
EventCenterBasedEventStream { event_rx }
}
}

impl Stream for EventCenterBasedEventStream {
type Item = super::ScheduleEvent;

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let mut this = std::pin::pin!(self);
this.event_rx.poll_recv(cx)
}
}
Loading

0 comments on commit d5f30a6

Please sign in to comment.