Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Changes for Rule engine and better run script #580

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: Changes for Rule engine and better run script
satlead committed Jul 26, 2024
commit deb7348f0e1ff878eb66dce724ac20cd6588c96d
2 changes: 1 addition & 1 deletion core/main/src/broker/http_broker.rs
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ impl EndpointBroker for HttpBroker {
let (tx, mut tr) = mpsc::channel(10);
let broker = BrokerSender { sender: tx };
let is_json_rpc = endpoint.jsonrpc;
let uri: Uri = endpoint.url.parse().unwrap();
let uri: Uri = endpoint.get_url().parse().unwrap();
// let mut headers = HeaderMap::new();
// headers.insert("Content-Type", "application/json".parse().unwrap());
// if let Some(auth) = &endpoint.authentication {
37 changes: 32 additions & 5 deletions core/main/src/broker/rules_engine.rs
Original file line number Diff line number Diff line change
@@ -37,7 +37,15 @@ pub struct RuleSet {
impl RuleSet {
pub fn append(&mut self, rule_set: RuleSet) {
self.endpoints.extend(rule_set.endpoints);
self.rules.extend(rule_set.rules);
let rules: HashMap<String, Rule> = rule_set
.rules
.into_iter()
.map(|(k, v)| {
debug!("Loading JQ Rule for {}", k.to_lowercase());
(k.to_lowercase(), v)
})
.collect();
self.rules.extend(rules);
}
}

@@ -49,6 +57,17 @@ pub struct RuleEndpoint {
pub jsonrpc: bool,
}

impl RuleEndpoint {
pub fn get_url(&self) -> String {
if cfg!(feature = "local_dev") {
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
return self.url.replace("127.0.0.1", &host_override);
}
}
self.url.clone()
}
}

fn default_autostart() -> bool {
true
}
@@ -131,8 +150,7 @@ impl RuleEngine {
if let Some(p) = Path::new(&path_for_rule).to_str() {
if let Ok(contents) = fs::read_to_string(p) {
debug!("Rule content {}", contents);
if let Ok((path, rule_set)) = Self::load_from_content(contents) {
debug!("Rules loaded from path={}", path);
if let Ok((_, rule_set)) = Self::load_from_content(contents) {
engine.rules.append(rule_set)
} else {
warn!("invalid rule found in path {}", path)
@@ -158,13 +176,22 @@ impl RuleEngine {
}

pub fn has_rule(&self, request: &RpcRequest) -> bool {
self.rules.rules.contains_key(&request.ctx.method)
self.rules
.rules
.contains_key(&request.ctx.method.to_lowercase())
}

pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option<Rule> {
if let Some(mut rule) = self.rules.rules.get(&rpc_request.method).cloned() {
if let Some(mut rule) = self
.rules
.rules
.get(&rpc_request.method.to_lowercase())
.cloned()
{
rule.transform.apply_context(rpc_request);
return Some(rule);
} else {
info!("Rule not available for {}", rpc_request.method);
}
None
}
3 changes: 2 additions & 1 deletion core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
@@ -64,7 +64,8 @@ impl ThunderBroker {
let callback_for_sender = callback.clone();
let broker_for_reconnect = broker.clone();
tokio::spawn(async move {
let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await;
let (mut ws_tx, mut ws_rx) =
BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await;

// send the first request to the broker. This is the controller statechange subscription request
let status_request = broker_c
5 changes: 3 additions & 2 deletions core/main/src/broker/websocket_broker.rs
Original file line number Diff line number Diff line change
@@ -47,7 +47,8 @@ impl WebsocketBroker {
let broker = BrokerSender { sender: tx };
tokio::spawn(async move {
if endpoint.jsonrpc {
let (mut ws_tx, mut ws_rx) = BrokerUtils::get_ws_broker(&endpoint.url, None).await;
let (mut ws_tx, mut ws_rx) =
BrokerUtils::get_ws_broker(&endpoint.get_url(), None).await;

tokio::pin! {
let read = ws_rx.next();
@@ -103,7 +104,7 @@ impl WebsocketBroker {
let cleaner = WSNotificationBroker::start(
v.clone(),
callback.clone(),
endpoint.url.clone(),
endpoint.get_url().clone(),
);
{
let mut map = map_clone.write().unwrap();
2 changes: 1 addition & 1 deletion device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ impl ThunderGetConfigStep {
GATEWAY_DEFAULT
);
}
if let Ok(host_override) = std::env::var("THUNDER_HOST") {
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
gateway_url.set_host(Some(&host_override)).ok();
}
return Ok(ThunderBootstrapStateWithConfig {
12 changes: 9 additions & 3 deletions device/thunder_ripple_sdk/src/client/thunder_client.rs
Original file line number Diff line number Diff line change
@@ -677,9 +677,15 @@ pub struct ThunderRawBoolRequest {

impl ThunderRawBoolRequest {
async fn send_request(self: Box<Self>) -> Value {
let host = match env::var("THUNDER_HOST") {
Ok(h) => h,
Err(_) => String::from("127.0.0.1"),
let host = {
if cfg!(feature = "local_dev") {
match env::var("DEVICE_HOST") {
Ok(h) => h,
Err(_) => String::from("127.0.0.1"),
}
} else {
String::from("127.0.0.1")
}
};

if let Ok(t) = env::var("THUNDER_TOKEN") {
2 changes: 1 addition & 1 deletion ripple
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ case ${1} in
;;
"run")
cargo build --features local_dev
THUNDER_HOST=${2} cargo run --features local_dev core/main
DEVICE_HOST=${2} cargo run --features local_dev core/main
;;
"run-mock")
workspace_dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )