Skip to content

Commit

Permalink
ver-0.2.3
Browse files Browse the repository at this point in the history
支持kafka作为任务源
  • Loading branch information
ipconfiger committed Jan 2, 2024
1 parent 10b27de commit 608f999
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ timer = "0.2.0"
lettre = { version = "0.10.4", default-features = false, features = ["builder", "smtp-transport", "rustls-tls"] }
dirs = "3.0.2"
reqwest = { version = "0.11.23", default-features = false, features = ["blocking", "json", "rustls-tls"] }
rdkafka = { version = "0.33.2", default-features = false, features = ["cmake-build"] }

[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ Light weight easy to use task manager written by Rust
--smtp_name alex #SMTP账号
--smtp_pwd ***** #SMTP密码
--starttls #是否启用starttls
--kafka_servers #Kafka的Broker列表,支持接入群集
--kafka_topic #接收任务的Topic
--kafka_resp_topic #返回任务执行结果的Topic


实时触发和延迟触发均通过http接口
实时触发和延迟触发均支持 HTTP 和 Kafka 多通道接入,默认只开启HTTP,在设置好正确的kafka前缀的参数后,即可从Kafka接收任务

http 接收任务的例子:
curl -X POST http://127.0.0.1:8000/task_in_queue
-H 'Content-Type: application/json'
-d '{...}'

JSON Body的格式如下
HTTP请求的JSON Body和Kafka的消息题的格式是一致的,都是JSON格式,定义如下

{
"id": "唯一编码, 任务的唯一编号",
Expand Down
144 changes: 135 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::net::IpAddr;
use std::net::Ipv4Addr;
use axum::{extract::{Json, path::Path as PathParam, State}, Router, routing::{post, get}, response::IntoResponse};
use std::{env, thread};

use std::path::{Path, PathBuf};
use std::str::FromStr;
use tokio::fs::File;
Expand All @@ -20,13 +21,17 @@ use tokio::fs;
use runner::{Task, AppConfig};
use dirs;
use redis::aio::Connection;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, BaseConsumer};
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
use rdkafka::Message;


#[derive(Clone)]
pub struct AppState {
pub config_path: String,
pub redis_client: redis::Client,
pub queue: QueueGroup,
queue: QueueGroup,
pub config: AppConfig
}

Expand Down Expand Up @@ -62,7 +67,8 @@ impl WebTask {
retry: 0,
cc,
error: "".to_string(),
waiting_resp: if let Some(w) = self.wait { w }else { 0usize }
waiting_resp: if let Some(w) = self.wait { w }else { 0usize },
src_chn: "".to_string()
}
}
}
Expand Down Expand Up @@ -157,6 +163,49 @@ impl QueueGroup {
}
}

struct KafkaProducer {
topic: String,
producer: Option<BaseProducer>
}

impl KafkaProducer {
fn from_bootstrap(servers: &str, topic: String) -> KafkaProducer {
let p: Option<BaseProducer> = if servers == "" {
None
} else{
let producer: Option<BaseProducer> = match ClientConfig::new()
.set("bootstrap.servers", servers)
.create() {
Ok(p)=>Some(p),
Err(err)=>{
println!("kafka producer error:{:?}", err);
None
}
};
producer
};
KafkaProducer{ producer: p, topic}
}

fn sent(&mut self, msg: serde_json::Value) {
if let Some(ref producer) = &self.producer {
match producer.send(BaseRecord::<String, String>::to(self.topic.as_str()).payload(&msg.to_string())) {
Ok(())=>{
println!("sent ok;");
},
Err((err, _))=>{
println!("sent error:{:?}", err);
}
}
for _i in 1..10 {
producer.poll(std::time::Duration::from_millis(100));
}
producer.flush(std::time::Duration::from_secs(1)).unwrap();
}
}
}


const TASK_WRONG: &'static str = "task||wrong";

const TASK_WORKING: &'static str = "task||working";
Expand Down Expand Up @@ -231,6 +280,18 @@ fn matches_config(matches: App) -> ArgMatches{
.long("starttls")
.help("使用SSL")
.action(clap::ArgAction::SetTrue))
.arg(Arg::with_name("kafka_servers")
.long("kafka_servers")
.help("kafka群集broker列表")
.default_value(""))
.arg(Arg::with_name("kafka_topic")
.long("kafka_topic")
.help("订阅kafka主题")
.default_value(""))
.arg(Arg::with_name("kafka_resp_topic")
.long("kafka_resp_topic")
.help("kafka接收执行结果的Topic")
.default_value("resp"))
.arg(Arg::with_name("dead")
.short('d')
.long("dead")
Expand All @@ -248,6 +309,9 @@ fn config_from_matches(matches: &ArgMatches) -> AppConfig {
let smtp_server = matches.get_one::<String>("smtp_server").unwrap();
let smtp_port = matches.get_one::<String>("smtp_port").unwrap().parse::<i32>().unwrap();
let starttls = matches.get_flag("starttls");
let kafka_servers = matches.get_one::<String>("kafka_servers").unwrap();
let kafka_topic = matches.get_one::<String>("kafka_topic").unwrap();
let resp_topic = matches.get_one::<String>("kafka_resp_topic").unwrap();

AppConfig{
smtp_name: smtp_name.to_string(),
Expand All @@ -256,7 +320,10 @@ fn config_from_matches(matches: &ArgMatches) -> AppConfig {
max_retry,
smtp_server: smtp_server.to_string(),
smtp_port,
starttls
starttls,
kafka_servers: kafka_servers.to_string(),
kafka_topic: kafka_topic.to_string(),
kafka_resp_topic: resp_topic.to_string()
}
}

Expand Down Expand Up @@ -285,7 +352,7 @@ async fn main() {
let mut redis_connection = client.get_connection().unwrap();
let appconfig = appconfig.clone();
thread::spawn(move || {
println!("worker:{} started", thread_id);
let mut pd = KafkaProducer::from_bootstrap(appconfig.kafka_servers.as_str(), appconfig.kafka_resp_topic.clone());
loop {
let task_id = group.wait_for(thread_id);
println!("worker:{} 捕获任务", task_id.clone());
Expand All @@ -299,14 +366,28 @@ async fn main() {
if !if_err.is_empty() {
task.error = if_err.to_string();
println!("执行错误:{}", task.error);

redis_connection.rpush::<String, String, ()>(format!("resp:{}", task.id.clone()), if_err).expect("返回执行结果错误");
if task.waiting_resp > 0usize {
if task.src_chn == "web" {
redis_connection.rpush::<String, String, ()>(format!("resp:{}", task.id.clone()), if_err.clone()).expect("返回执行结果错误");
}
if task.src_chn == "kafka" {
pd.sent(serde_json::json!({"result": "Fail", "reason": if_err.clone(), "request_id": task.id.clone()}));
}
}
if let Ok(save_payload) = serde_json::to_string(&task) {
redis_connection.set::<String, String, ()>(task.id.clone(), save_payload).expect("回存变更错误");
}
redis_connection.sadd::<&str, String, ()>(TASK_WRONG, task.id.clone()).expect("添加到错误队列错误")
}else{
redis_connection.del::<&str, ()>(task.id.as_str()).expect("redis del error");
if task.waiting_resp > 0usize {
if task.src_chn == "web" {
redis_connection.rpush::<String, String, ()>(format!("resp:{}", task.id.clone()), "OK".to_string()).expect("返回执行结果错误");
}
if task.src_chn == "kafka" {
pd.sent(serde_json::json!({"result": "OK", "request_id": task.id.clone()}));
}
}
}
redis_connection.srem::<&str, String, ()>(TASK_WORKING, task.id.clone()).expect("redis error");
}
Expand Down Expand Up @@ -403,6 +484,49 @@ async fn main() {
}
});

if ! appconfig.kafka_servers.is_empty() {
let kfk_cfg = appconfig.clone();
let mut cg = queue_group.clone();
let mut redis_conn = client.get_connection().unwrap();
println!("启动Kafka消费线程");
thread::spawn( move ||{
let consumer:BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", &kfk_cfg.kafka_servers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("group.id", "pouple-consumer")
.create()
.expect("Consumer creation failed");
let topic = kfk_cfg.kafka_topic.as_str();
if let Err(err) = consumer.subscribe(&[topic]){
println!("Kafka Error:{}", err);
std::process::exit(1);
}
loop {
if let Some(Ok(msg)) = consumer.poll(std::time::Duration::from_secs(60)){
if let Some(Ok(msg_str)) = msg.payload_view::<str>(){
if let Ok(mut web_task) = serde_json::from_str::<WebTask>(msg_str) {
let now: DateTime<Utc> = Utc::now();
let now_ts = now.timestamp();
let mut task = web_task.gen_task(cg.size);
task.src_chn = "kafka".to_string();
let redis_payload = serde_json::to_string(&task).unwrap();
redis_conn.set::<String, String, ()>(task.id.clone(), redis_payload).expect("save error");
if web_task.delay == 0 {
redis_conn.sadd::<String, String, ()>(TASK_WORKING.to_string(), task.id.clone()).expect("save working error");
cg.dispatch_task(&task);
}else{
let delay_key = format!("{} {}", task.id, now_ts + web_task.delay as i64);
redis_conn.sadd::<String, String, ()>(TASK_DELAY.to_string(), delay_key.clone()).expect("save delay error");
}
}
}
}
}
});
}

if let Ok(mut main_conn) = client.get_async_connection().await {
if let Ok(working_ids) = main_conn.smembers::<&str, Vec<String>>(TASK_WORKING).await {
for tk_id in working_ids {
Expand Down Expand Up @@ -454,7 +578,8 @@ pub async fn handler(State(mut state): State<AppState>,
if state.config.smtp_name.is_empty() && web_task.method.to_lowercase() == "mail_to" {
return Json(serde_json::json!({"result":"Fail", "reason": "未配置SMTP服务器"}));
}
let task = web_task.gen_task(state.queue.size);
let mut task = web_task.gen_task(state.queue.size);
task.src_chn = "web".to_string();
let redis_payload = serde_json::to_string(&task).unwrap();
conn.set::<String, String, ()>(task.id.clone(), redis_payload).await.expect("set error");
if web_task.delay == 0 {
Expand All @@ -480,7 +605,7 @@ pub async fn handler(State(mut state): State<AppState>,
}
}

async fn waiting(State(mut state): State<AppState>, PathParam(key): PathParam<String>) -> impl IntoResponse {
async fn waiting(State(state): State<AppState>, PathParam(key): PathParam<String>) -> impl IntoResponse {
let mut conn = state.redis_client.get_async_connection().await.unwrap();
let resp = waiting_for_result(&mut conn, key, 60).await;
Json(resp)
Expand Down Expand Up @@ -609,7 +734,8 @@ fn get_tasks_avaliable(tasks: &Vec<String>, avaliable_tasks: &mut Vec<Task>) {
retry:0,
cc: vec![],
error: "".to_string(),
waiting_resp: 0usize
waiting_resp: 0usize,
src_chn: "cron".to_string()
};
avaliable_tasks.insert(0, task);
}
Expand Down
8 changes: 6 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ pub struct AppConfig {
pub smtp_pwd: String,
pub retry_interval: i32,
pub max_retry: i32,
pub starttls: bool
pub starttls: bool,
pub kafka_servers: String,
pub kafka_topic: String,
pub kafka_resp_topic: String
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -36,7 +39,8 @@ pub struct Task {
pub retry: i32,
pub cc: Vec<i32>,
pub error: String,
pub waiting_resp: usize
pub waiting_resp: usize,
pub src_chn: String
}

impl Task {
Expand Down

0 comments on commit 608f999

Please sign in to comment.