diff --git a/core/main/Cargo.toml b/core/main/Cargo.toml index de9de61cf..e42929937 100644 --- a/core/main/Cargo.toml +++ b/core/main/Cargo.toml @@ -59,6 +59,8 @@ jaq-core = "1.5.0" jaq-std = { version = "1.5.1", default-features = false } openrpc_validator = { path = "../../openrpc_validator" } +ripple_tdk = { path = "../tdk" } + [build-dependencies] vergen = "1" diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 88911f430..1fd8efee9 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::error, + log::{debug, error, info}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -51,7 +51,7 @@ use crate::{ use super::{ http_broker::HttpBroker, - rules_engine::{jq_compile, Rule, RuleEndpoint, RuleEndpointProtocol, RuleEngine}, + rules_engine::{jq_compile, JqError, Rule, RuleEndpoint, RuleEndpointProtocol, RuleEngine}, thunder_broker::ThunderBroker, websocket_broker::WebsocketBroker, }; @@ -76,7 +76,7 @@ impl BrokerCleaner { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct BrokerRequest { pub rpc: RpcRequest, pub rule: Rule, @@ -151,6 +151,13 @@ impl BrokerRequest { pub struct BrokerCallback { pub sender: Sender, } +impl Default for BrokerCallback { + fn default() -> Self { + Self { + sender: mpsc::channel(2).0, + } + } +} static ATOMIC_ID: AtomicU64 = AtomicU64::new(0); @@ -187,22 +194,40 @@ pub struct BrokerContext { pub struct BrokerOutput { pub data: JsonRpcApiResponse, } +impl Default for BrokerOutput { + fn default() -> Self { + Self { + data: JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + id: None, + method: None, + result: None, + error: None, + params: None, + }, + } + } +} +pub fn get_event_id_from_method(method: Option) -> Option { + method.and_then(|m| { + let event: Vec<&str> = m.split('.').collect(); + event.first().and_then(|v| v.parse::().ok()) + }) +} +pub fn is_event(method: Option) -> bool { + get_event_id_from_method(method).is_some() +} impl BrokerOutput { pub fn is_result(&self) -> bool { self.data.result.is_some() } pub fn get_event(&self) -> Option { - if let Some(e) = &self.data.method { - let event: Vec<&str> = e.split('.').collect(); - if let Some(v) = event.first() { - if let Ok(r) = v.parse::() { - return Some(r); - } - } - } - None + get_event_id_from_method(self.data.method.clone()) + } + pub fn is_event(&self) -> bool { + is_event(self.data.method.clone()) } } @@ -236,6 +261,19 @@ pub struct EndpointBrokerState { cleaner_list: Arc>>, reconnect_tx: Sender, } +impl Default for EndpointBrokerState { + fn default() -> Self { + Self { + endpoint_map: Arc::new(RwLock::new(HashMap::new())), + callback: BrokerCallback::default(), + request_map: Arc::new(RwLock::new(HashMap::new())), + extension_request_map: Arc::new(RwLock::new(HashMap::new())), + rule_engine: RuleEngine::default(), + cleaner_list: Arc::new(RwLock::new(Vec::new())), + reconnect_tx: mpsc::channel(2).0, + } + } +} impl EndpointBrokerState { pub fn new( @@ -479,7 +517,7 @@ pub trait EndpointBroker { /// Adds BrokerContext to a given request used by the Broker Implementations /// just before sending the data through the protocol - fn update_request(rpc_request: &BrokerRequest) -> Result { + fn update_request(rpc_request: &BrokerRequest) -> Result { let v = Self::apply_request_rule(rpc_request)?; let id = rpc_request.rpc.ctx.call_id; let method = rpc_request.rule.alias.clone(); @@ -502,7 +540,7 @@ pub trait EndpointBroker { } /// Generic method which takes the given parameters from RPC request and adds rules using rule engine - fn apply_request_rule(rpc_request: &BrokerRequest) -> Result { + fn apply_request_rule(rpc_request: &BrokerRequest) -> Result { if let Ok(mut params) = serde_json::from_str::>(&rpc_request.rpc.params_json) { if params.len() > 1 { if let Some(last) = params.pop() { @@ -523,7 +561,7 @@ pub trait EndpointBroker { return Ok(Value::Null); } } - Err(RippleError::ParseError) + Err(RippleError::ParseError.into()) } /// Default handler method for the broker to remove the context and send it back to the @@ -545,90 +583,206 @@ pub trait EndpointBroker { /// Forwarder gets the BrokerOutput and forwards the response to the gateway. pub struct BrokerOutputForwarder; +pub fn get_event_id(broker_output: BrokerOutput) -> Option { + broker_output.get_event().or(broker_output.data.id) +} + +#[derive(Debug, Clone)] +pub enum BrokerWorkFlowError { + MissingValue, + NoRuleFound, + JqError(JqError), + JsonParseError, + ApiMessageError, +} + +#[derive(Debug, Clone)] +pub struct SessionizedApiMessage { + pub session_id: String, + pub api_message: ApiMessage, +} +#[derive(Debug, Clone)] +pub enum BrokerWorkflowSuccess { + SubscriptionProcessed(BrokerOutput, Option), + Unsubcribe(BrokerOutput, Option), + RuleAppliedToEvent(BrokerOutput, Option), + FilterApplied(BrokerOutput, Option), +} +impl From for BrokerWorkFlowError { + fn from(e: JqError) -> Self { + BrokerWorkFlowError::JqError(e) + } +} + +/* + +Factor out broker workflow from tokio loop +*/ +pub fn run_broker_workflow( + broker_output: &BrokerOutput, + broker_request: &BrokerRequest, +) -> Result { + let sub_processed = broker_request.is_subscription_processed(); + let rpc_request = broker_request.rpc.clone(); + let is_subscription = rpc_request.is_subscription(); + let is_event = is_event(broker_output.data.method.clone()); + let id = get_event_id(broker_output.clone()); + let request_id = rpc_request.ctx.call_id; + + if let Some(result) = broker_output.data.result.clone() { + let mut mutant = broker_output.clone(); + mutant.data.id = Some(request_id); + if is_event { + let f = apply_rule_for_event(broker_request, &result, &rpc_request, broker_output)?; + Ok(BrokerWorkflowSuccess::RuleAppliedToEvent(f, id)) + } else if is_subscription { + if sub_processed { + return Ok(BrokerWorkflowSuccess::SubscriptionProcessed(mutant, id)); + } + mutant.data.result = Some(json!({ + "listening" : rpc_request.is_listening(), + "event" : rpc_request.ctx.method + })); + Ok(BrokerWorkflowSuccess::Unsubcribe(mutant, id)) + } else if let Some(filter) = broker_request + .rule + .transform + .get_filter(super::rules_engine::RuleTransformType::Response) + { + Ok(BrokerWorkflowSuccess::FilterApplied( + apply_response(result, filter, &rpc_request, broker_output)?, + id, + )) + } else { + Err(BrokerWorkFlowError::NoRuleFound) + } + } else { + Err(BrokerWorkFlowError::JsonParseError) + } +} + +pub fn brokered_to_api_message_response( + broker_output: BrokerOutput, + broker_request: &BrokerRequest, + request_id: String, +) -> Result { + match serde_json::to_string(&broker_output.data) { + Ok(jsonrpc_msg) => Ok(ApiMessage { + request_id, + protocol: broker_request.rpc.ctx.protocol.clone(), + jsonrpc_msg, + }), + Err(_) => Err(BrokerWorkFlowError::ApiMessageError), + } +} +pub fn get_request_id(broker_request: &BrokerRequest, request_id: Option) -> String { + request_id + .map(|v| v.to_string()) + .unwrap_or_else(|| broker_request.rpc.ctx.call_id.to_string()) +} +pub fn broker_workflow( + broker_output: &BrokerOutput, + broker_request: &BrokerRequest, + platform_state: &PlatformState, +) -> Result { + match run_broker_workflow(broker_output, broker_request)? { + BrokerWorkflowSuccess::SubscriptionProcessed(broker_output, _request_id) => { + Ok(SessionizedApiMessage { + session_id: broker_request.rpc.ctx.get_id(), + api_message: brokered_to_api_message_response( + broker_output, + broker_request, + broker_request.rpc.ctx.request_id.clone(), + )?, + }) + } + BrokerWorkflowSuccess::Unsubcribe(broker_output, request_id) => { + if let Some(id) = request_id { + platform_state.endpoint_state.update_unsubscribe_request(id) + } + Ok(SessionizedApiMessage { + session_id: broker_request.rpc.ctx.get_id(), + api_message: brokered_to_api_message_response( + broker_output, + broker_request, + broker_request.rpc.ctx.request_id.clone(), + )?, + }) + } + BrokerWorkflowSuccess::RuleAppliedToEvent(broker_output, _) => Ok(SessionizedApiMessage { + session_id: broker_request.rpc.ctx.get_id(), + api_message: brokered_to_api_message_response( + broker_output, + broker_request, + broker_request.rpc.ctx.request_id.clone(), + )?, + }), + BrokerWorkflowSuccess::FilterApplied(broker_output, _) => Ok(SessionizedApiMessage { + session_id: broker_request.rpc.ctx.get_id(), + api_message: brokered_to_api_message_response( + broker_output, + broker_request, + broker_request.rpc.ctx.request_id.clone(), + )?, + }), + } +} impl BrokerOutputForwarder { pub fn start_forwarder(platform_state: PlatformState, mut rx: Receiver) { tokio::spawn(async move { - while let Some(mut v) = rx.recv().await { - let mut is_event = false; - // First validate the id check if it could be an event - let id = if let Some(e) = v.get_event() { - is_event = true; - Some(e) - } else { - v.data.id - }; - - if let Some(id) = id { - if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) { - let sub_processed = broker_request.is_subscription_processed(); - let rpc_request = broker_request.rpc.clone(); - let session_id = rpc_request.ctx.get_id(); - let is_subscription = rpc_request.is_subscription(); - - // Step 1: Create the data - if let Some(result) = v.data.result.clone() { - if is_event { - apply_rule_for_event( - &broker_request, - &result, - &rpc_request, - &mut v, + while let Some(broker_output) = rx.recv().await { + if let Some(request_id) = get_event_id(broker_output.clone()) { + if let Ok(broker_request) = + platform_state.endpoint_state.get_request(request_id) + { + info!("processing request {:?}", request_id); + + match broker_workflow(&broker_output, &broker_request, &platform_state) { + Ok(message) => { + let session_id = message.session_id; + let is_event = is_event(broker_output.data.method.clone()); + //let session_id = get_request_id(&broker_request, None); + info!( + "processing request id={} for session_id={:?}", + request_id, session_id ); - } else if is_subscription { - if sub_processed { - continue; + if matches!(message.api_message.protocol, ApiProtocol::Extn) { + if let Ok(extn_message) = platform_state + .endpoint_state + .get_extn_message(request_id, is_event) + { + if is_event { + forward_extn_event( + &extn_message, + broker_output.data, + &platform_state, + ) + .await; + } else { + return_extn_response(message.api_message, extn_message) + } + } + } else if let Some(session) = platform_state + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport( + session, + message.api_message, + platform_state.clone(), + ) + .await } - v.data.result = Some(json!({ - "listening" : rpc_request.is_listening(), - "event" : rpc_request.ctx.method - })); - platform_state.endpoint_state.update_unsubscribe_request(id); - } else if let Some(filter) = broker_request - .rule - .transform - .get_filter(super::rules_engine::RuleTransformType::Response) - { - apply_response(result, filter, &rpc_request, &mut v); } - } - - let request_id = rpc_request.ctx.call_id; - v.data.id = Some(request_id); - - // Step 2: Create the message - let message = ApiMessage { - request_id: request_id.to_string(), - protocol: rpc_request.ctx.protocol.clone(), - jsonrpc_msg: serde_json::to_string(&v.data).unwrap(), - }; - - // Step 3: Handle Non Extension - if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) { - if let Ok(extn_message) = - platform_state.endpoint_state.get_extn_message(id, is_event) - { - if is_event { - forward_extn_event(&extn_message, v.data, &platform_state) - .await; - } else { - return_extn_response(message, extn_message) - } + Err(e) => { + error!("Error couldnt broker the event {:?}", e) + /* + TODO - who do we tell about this? + */ } - } else if let Some(session) = platform_state - .session_state - .get_session_for_connection_id(&session_id) - { - return_api_message_for_transport( - session, - message, - platform_state.clone(), - ) - .await } } - } else { - error!("Error couldnt broker the event {:?}", v) } } }); @@ -685,68 +839,139 @@ async fn forward_extn_event( } } -fn apply_response(result: Value, filter: String, rpc_request: &RpcRequest, v: &mut BrokerOutput) { +//fn apply_response( +// result: Value, +// filter: String, +// rpc_request: &RpcRequest, +// broker_output: &BrokerOutput, +// ) -> Result { +// match jq_compile( +// result.clone(), +// &filter, +// format!("{}_response", rpc_request.ctx.method), +// ) { +// Ok(r) => { +// let mut mutant = broker_output.clone(); +// if r.to_string().to_lowercase().contains("null") { +// mutant.data.result = Some(Value::Null) +// } else if result.get("success").is_some() { +// mutant.data.result = Some(r); +// mutant.data.error = None; +// } else { +// mutant.data.error = Some(r); +// mutant.data.result = None; +// } +// Ok(mutant) +// } +// Err(e) => { +// error!("jq_compile error {:?}", e); +// Err(e) +// } +// } +// } +fn apply_response( + raw_value: Value, + filter: String, + rpc_request: &RpcRequest, + broker_output: &BrokerOutput, +) -> Result { match jq_compile( - result.clone(), + raw_value.clone(), &filter, format!("{}_response", rpc_request.ctx.method), ) { - Ok(r) => { - if r.to_string().to_lowercase().contains("null") { - v.data.result = Some(Value::Null) - } else if result.get("success").is_some() { - v.data.result = Some(r); - v.data.error = None; + Ok(compilation_result) => { + let mut mutant = broker_output.clone(); + debug!( + "jq_compile result {:?} for {}", + compilation_result, raw_value + ); + if compilation_result == Value::Null { + error!( + "error processing: {} from {}", + compilation_result, raw_value + ); + return Err(JqError::RuleCompileFailed(format!( + "{}", + compilation_result + ))); + // mutant.data.error = Some(Value::from(false)); + // mutant.data.result = Some(Value::Null); + } else if compilation_result.get("success").is_some() { + mutant.data.result = Some(compilation_result.clone()); + mutant.data.error = match compilation_result.get("success") { + Some(v) => { + if v.is_boolean() { + if !v.as_bool().unwrap() { + Some(Value::from(false)) + } else { + None + } + } else { + None + } + } + None => None, + }; + } else if raw_value.get("success").is_some() { + mutant.data.result = Some(compilation_result); + mutant.data.error = match raw_value.get("success") { + Some(v) => { + if v.is_boolean() { + if !v.as_bool().unwrap() { + Some(Value::from(false)) + } else { + None + } + } else { + None + } + } + None => None, + } } else { - v.data.error = Some(r); - v.data.result = None; + mutant.data.error = None; + mutant.data.result = Some(compilation_result); } + Ok(mutant) + } + Err(e) => { + error!("jq_compile error {:?}", e); + Err(JqError::RuleParseFailed) } - Err(e) => error!("jq_compile error {:?}", e), } } - fn apply_rule_for_event( broker_request: &BrokerRequest, result: &Value, rpc_request: &RpcRequest, - v: &mut BrokerOutput, -) { + broker_output: &BrokerOutput, +) -> Result { if let Some(filter) = broker_request .rule .transform .get_filter(super::rules_engine::RuleTransformType::Event) { - if let Ok(r) = jq_compile( + let data = jq_compile( result.clone(), &filter, format!("{}_event", rpc_request.ctx.method), - ) { - v.data.result = Some(r); - } + )?; + let mut mutated_broker_output = broker_output.clone(); + mutated_broker_output.data.result = Some(data); + Ok(mutated_broker_output.clone()) + } else { + Err(JqError::RuleNotFound(rpc_request.ctx.method.clone())) } } #[cfg(test)] mod tests { - use ripple_sdk::{tokio::sync::mpsc::channel, Mockable}; + use ripple_sdk::{tokio::sync::mpsc::channel, utils::logger::init_logger, Mockable}; use crate::broker::rules_engine::RuleTransform; use super::*; - mod endpoint_broker { - use ripple_sdk::{api::gateway::rpc_gateway_api::RpcRequest, Mockable}; - - #[test] - fn test_update_context() { - let _request = RpcRequest::mock(); - - // if let Ok(v) = WebsocketBroker::add_context(&request) { - // println!("_ctx {}", v); - // //assert!(v.get("_ctx").unwrap().as_u64().unwrap().eq(&1)); - // } - } - } #[tokio::test] async fn test_send_error() { @@ -770,83 +995,109 @@ mod tests { let value = tr.recv().await.unwrap(); assert!(value.data.error.is_some()) } + /*add exhaustive unit tests for as many function as possible */ + + use serde_json::Number; + + #[test] + fn test_run_broker_workflow() { + let _ = init_logger("broker".into()); + let mut broker_request = BrokerRequest::default(); + broker_request.rule.transform.response = Some(".success".to_owned()); + let payload = JsonRpcApiResponse { + result: Some(serde_json::Value::Bool(true)), + ..Default::default() + }; + let broker_output = BrokerOutput { data: payload }; - mod broker_output { - use ripple_sdk::{api::gateway::rpc_gateway_api::JsonRpcApiResponse, Mockable}; - - use crate::broker::endpoint_broker::BrokerOutput; + let result = run_broker_workflow(&broker_output, &broker_request); + assert!(result.is_ok()); + } - #[test] - fn test_result() { - let mut data = JsonRpcApiResponse::mock(); - let output = BrokerOutput { data: data.clone() }; - assert!(!output.is_result()); - data.result = Some(serde_json::Value::Null); - let output = BrokerOutput { data }; - assert!(output.is_result()); - } + #[test] + fn test_brokered_to_api_message_response() { + let request_id = String::from("12345"); + let broker_request = BrokerRequest::default(); + let broker_output = BrokerOutput::default(); + let result = brokered_to_api_message_response(broker_output, &broker_request, request_id); + assert!(result.is_ok()); + } - #[test] - fn test_get_event() { - let mut data = JsonRpcApiResponse::mock(); - data.method = Some("20.events".to_owned()); - let output = BrokerOutput { data }; - assert_eq!(20, output.get_event().unwrap()) - } + #[test] + fn test_get_request_id() { + let broker_request = BrokerRequest::default(); + let request_id = get_request_id(&broker_request, None); + assert!(!request_id.is_empty()); } - mod endpoint_broker_state { - use ripple_sdk::{ - api::gateway::rpc_gateway_api::RpcRequest, tokio, tokio::sync::mpsc::channel, Mockable, + #[test] + fn test_broker_workflow() { + let mut broker_request = BrokerRequest::default(); + broker_request.rule.transform.response = Some(".success".to_owned()); + let payload = JsonRpcApiResponse { + result: Some(serde_json::Value::Number(Number::from(1))), + ..Default::default() }; - use crate::{ - broker::{ - endpoint_broker::tests::RippleClient, - rules_engine::{Rule, RuleEngine, RuleSet, RuleTransform}, - }, - state::bootstrap_state::ChannelsState, - }; + let broker_output = BrokerOutput { data: payload }; + let result = + super::broker_workflow(&broker_output, &broker_request, &PlatformState::default()); + assert!(result.is_ok()); + } - use super::EndpointBrokerState; + // #[test] + // fn test_start_forwarder() { + // let platform_state = PlatformState::default(); + // let rx = Receiver::default(); + + // start_forwarder(platform_state, rx); + // // TODO: Add assertions or mock the tokio::spawn call to verify the behavior + // } + + #[test] + fn test_handle_non_jsonrpc_response() { + let data: &[u8] = &[1, 2, 3]; + let callback = BrokerCallback::default(); + let request = BrokerRequest::default(); + let result = BrokerOutputForwarder::handle_non_jsonrpc_response(data, callback, request); + assert!(result.is_err()); + } - #[tokio::test] - async fn get_request() { - let (tx, _) = channel(2); - let client = RippleClient::new(ChannelsState::new()); - let state = EndpointBrokerState::new( - tx, - RuleEngine { - rules: RuleSet::default(), - }, - client, - ); - let mut request = RpcRequest::mock(); - state.update_request( - &request, - Rule { - alias: "somecallsign.method".to_owned(), - transform: RuleTransform::default(), - endpoint: None, - }, - None, - ); - request.ctx.call_id = 2; - state.update_request( - &request, - Rule { - alias: "somecallsign.method".to_owned(), - transform: RuleTransform::default(), - endpoint: None, - }, - None, - ); + #[tokio::test] + async fn test_forward_extn_event() { + let extn_message = ExtnMessage::default(); + let v = JsonRpcApiResponse::default(); + let platform_state = PlatformState::default(); + forward_extn_event(&extn_message, v, &platform_state).await; + // TODO: Add assertions or mock the platform_state.get_client().get_extn_client().send_message call to verify the behavior + } - // Hardcoding the id here will be a problem as multiple tests uses the atomic id and there is no guarantee - // that this test case would always be the first one to run - // Revisit this test case, to make it more robust - // assert!(state.get_request(2).is_ok()); - // assert!(state.get_request(1).is_ok()); - } + #[test] + fn test_apply_response() { + let result = serde_json::json!({"success": true}); + let filter = String::from(".success"); + let rpc_request = RpcRequest::default(); + let broker_output = BrokerOutput::default(); + let result = apply_response(result, filter, &rpc_request, &broker_output); + assert!(result.is_ok()); + } + + #[test] + fn test_apply_rule_for_event() { + let mut broker_request = BrokerRequest::default(); + broker_request.rule.transform.response = Some(".success".to_owned()); + let payload = JsonRpcApiResponse { + result: Some(serde_json::Value::Number(Number::from(1))), + ..Default::default() + }; + + let result = json!({}); + let rpc_request = RpcRequest::default(); + let broker_output = BrokerOutput { data: payload }; + + broker_request.rule.transform.event = Some(".success".to_owned()); + let result = apply_rule_for_event(&broker_request, &result, &rpc_request, &broker_output); + println!("result={:?}", result); + assert!(result.is_ok()); } } diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 1f86232a8..f32ba9506 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -18,6 +18,7 @@ use jaq_interpret::{Ctx, FilterT, ParseCtx, RcIter, Val}; use ripple_sdk::api::{ gateway::rpc_gateway_api::RpcRequest, manifest::extn_manifest::ExtnManifest, }; +use ripple_sdk::log::trace; use ripple_sdk::{ chrono::Utc, log::{debug, error, info, warn}, @@ -26,7 +27,7 @@ use ripple_sdk::{ }; use serde::Deserialize; use std::collections::HashMap; -use std::{fs, path::Path}; +use std::fs; #[derive(Debug, Deserialize, Default, Clone)] pub struct RuleSet { @@ -91,6 +92,15 @@ pub struct Rule { pub transform: RuleTransform, pub endpoint: Option, } +impl Default for Rule { + fn default() -> Self { + Rule { + alias: "".to_string(), + transform: RuleTransform::default(), + endpoint: None, + } + } +} #[derive(Debug, Clone, Deserialize, Default)] pub struct RuleTransform { @@ -133,6 +143,29 @@ pub enum RuleTransformType { pub struct RuleEngine { pub rules: RuleSet, } +#[derive(Debug, Clone)] +pub enum RuleEngineError { + PartialRuleLoadError(Vec, RuleEngine), +} + +#[derive(Debug, Clone)] +pub enum JqError { + RuleParseFailed, + RuleCompileFailed(String), + RuleNotFound(String), + RuleFailedToProcess(String), + InvalidData, +} +impl From for JqError { + fn from(ripple_error: RippleError) -> Self { + JqError::RuleCompileFailed(ripple_error.to_string()) + } +} +impl From for RippleError { + fn from(_: JqError) -> Self { + RippleError::ParseError + } +} impl RuleEngine { fn build_path(path: &str, default_path: &str) -> String { @@ -142,29 +175,69 @@ impl RuleEngine { format!("{}{}", default_path, path) } } - - pub fn build(extn_manifest: &ExtnManifest) -> Self { - debug!("building rules engine {:?}", extn_manifest.rules_path); + pub fn build_from_paths( + rule_paths: Vec, + default_path: &str, + ) -> Result { let mut engine = RuleEngine::default(); - for path in extn_manifest.rules_path.iter() { - let path_for_rule = Self::build_path(path, &extn_manifest.default_path); - debug!("loading rule {}", path_for_rule); - if let Some(p) = Path::new(&path_for_rule).to_str() { - if let Ok(contents) = fs::read_to_string(p) { - debug!("Rule content {}", contents); - if let Ok((_, rule_set)) = Self::load_from_content(contents) { - engine.rules.append(rule_set) - } else { - warn!("invalid rule found in path {}", path) - } + let mut failed_paths: Vec = Vec::new(); + for path in rule_paths.iter() { + let path = Self::build_path(path, default_path); + if let Ok(contents) = fs::read_to_string(path.clone()) { + if let Ok((path, rule_set)) = Self::load_from_content(contents) { + debug!("Rules loaded from path={}", path); + engine.rules.append(rule_set) } else { - warn!("path for the rule is invalid {}", path) + failed_paths.push(path.to_string()); + warn!("invalid rule found in path {}", path) } } else { - warn!("invalid rule path {}", path) + failed_paths.push(path.to_string()); + warn!("path for the rule is invalid {}", path) } } - engine + if failed_paths.is_empty() { + Err(RuleEngineError::PartialRuleLoadError(failed_paths, engine)) + } else { + Ok(engine) + } + } + + pub fn build(extn_manifest: &ExtnManifest) -> Self { + debug!("building rules engine {:?}", extn_manifest.rules_path); + match Self::build_from_paths( + extn_manifest.rules_path.clone(), + extn_manifest.default_path.as_str(), + ) { + Ok(engine) => engine, + Err(e) => match e { + RuleEngineError::PartialRuleLoadError(failed_paths, engine) => { + error!("Failed to load rules from paths {:?}. This is currently not fatal but may produce unexepected results. ",failed_paths); + engine + } + }, + } + // let mut engine = RuleEngine::default(); + // for path in extn_manifest.rules_path.iter() { + // let path_for_rule = Self::build_path(path, &extn_manifest.default_path); + // debug!("loading rule {}", path_for_rule); + // if let Some(p) = Path::new(&path_for_rule).to_str() { + // if let Ok(contents) = fs::read_to_string(p) { + // debug!("Rule content {}", contents); + // if let Ok((path, rule_set)) = Self::load_from_content(contents) { + // debug!("Rules loaded from path={}", path); + // engine.rules.append(rule_set) + // } else { + // warn!("invalid rule found in path {}", path) + // } + // } else { + // warn!("path for the rule is invalid {}", path) + // } + // } else { + // warn!("invalid rule path {}", path) + // } + // } + // engine } pub fn load_from_content(contents: String) -> Result<(String, RuleSet), RippleError> { @@ -178,118 +251,136 @@ impl RuleEngine { } pub fn has_rule(&self, request: &RpcRequest) -> bool { - self.rules - .rules - .contains_key(&request.ctx.method.to_lowercase()) + self.rules.rules.contains_key(&request.ctx.method) } pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option { - if let Some(mut rule) = self - .rules - .rules - .get(&rpc_request.method.to_lowercase()) - .cloned() - { - rule.transform.apply_context(rpc_request); + self.get_rule_by_method_name(&rpc_request.method) + } + pub fn get_rule_by_method_name(&self, method_name: &str) -> Option { + if let Some(rule) = self.rules.rules.get(method_name).cloned() { return Some(rule); - } else { - info!("Rule not available for {}", rpc_request.method); } None } } -pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { - debug!("Jq rule {} input {:?}", filter, input); +// pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { +// debug!("Jq rule {} input {:?}", filter, input); +// let start = Utc::now().timestamp_millis(); +// // start out only from core filters, +// // which do not include filters in the standard library +// // such as `map`, `select` etc. +// let mut defs = ParseCtx::new(Vec::new()); + +// // parse the filter +// let (f, errs) = jaq_parse::parse(filter, jaq_parse::main()); +// if !errs.is_empty() { +// error!("Error in rule {:?}", errs); +// return Err(JqError::RuleParseFailed); +// } + +// // compile the filter in the context of the given definitions +// let f = defs.compile(f.unwrap()); +// if !defs.errs.is_empty() { +// error!("Error in rule {}", reference); +// for (err, _) in defs.errs { +// error!("reference={} {}", reference, err); +// } +// return Err(JqError::RuleCompileFailed); +// } + +// let inputs = RcIter::new(core::iter::empty()); + +// // iterator over the output values +// let mut out = f.run((Ctx::new([], &inputs), Val::from(input))); + +// if let Some(Ok(v)) = out.next() { +// info!( +// "Ripple Gateway Rule Processing Time: {},{}", +// reference, +// Utc::now().timestamp_millis() - start +// ); +// return Ok(Value::from(v)); +// } + +// Err(JqError::RuleNotFound) +// } + +pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result { + debug!( + "processing jq_rule={}, input {:?} , reference={}", + filter, input, reference + ); let start = Utc::now().timestamp_millis(); // start out only from core filters, // which do not include filters in the standard library // such as `map`, `select` etc. - let mut defs = ParseCtx::new(Vec::new()); - defs.insert_natives(jaq_core::core()); - defs.insert_defs(jaq_std::std()); + // parse the filter let (f, errs) = jaq_parse::parse(filter, jaq_parse::main()); if !errs.is_empty() { error!("Error in rule {:?}", errs); - return Err(RippleError::RuleError); + return Err(JqError::RuleParseFailed); } + // compile the filter in the context of the given definitions - let f = defs.compile(f.unwrap()); + let compiled = defs.compile(f.unwrap()); if !defs.errs.is_empty() { error!("Error in rule {}", reference); for (err, _) in defs.errs { error!("reference={} {}", reference, err); } - return Err(RippleError::RuleError); + return Err(JqError::RuleCompileFailed(reference.clone())); } - let inputs = RcIter::new(core::iter::empty()); + //let inputs = RcIter::new(core::iter::empty()); + // iterator over the output values - let mut out = f.run((Ctx::new([], &inputs), Val::from(input))); - if let Some(Ok(v)) = out.next() { - info!( - "Ripple Gateway Rule Processing Time: {},{}", - reference, - Utc::now().timestamp_millis() - start - ); - return Ok(Value::from(v)); - } + let out = compiled + .run(( + Ctx::new([], &RcIter::new(core::iter::empty())), + Val::from(input.clone()), + )) + .next(); - Err(RippleError::ParseError) -} + match out { + Some(val) => match val { + Ok(v) => { + info!( + "Ripple Gateway Rule Processing Time: {},{}", + reference, + Utc::now().timestamp_millis() - start + ); + trace!( + "jq_rule={}, input {:?} , extracted value={}", + filter, + input, + v + ); -#[cfg(test)] -mod tests { - use super::*; - use ripple_sdk::serde_json::json; - - #[test] - fn test_jq_compile() { - let filter = "if .success then ( .stbVersion | split(\"_\")[0] ) else { code: -32100, message: \"couldn't get version\" } end"; - let input = json!({ - "stbVersion":"SCXI11BEI_VBN_24Q2_sprint_20240620140024sdy_FG_GRT", - "receiverVersion":"7.2.0.0", - "stbTimestamp":"Thu 20 Jun 2024 14:00:24 UTC", - "success":true - }); - let resp = jq_compile(input, filter, String::new()); - assert_eq!(resp.unwrap(), "SCXI11BEI".to_string()); - - let filter = "{ namespace: \"refui\", scope: .scope, key: .key, value: .value }"; - let input = json!({ - "key": "key3", - "scope": "account", - "value": "value2" - }); - let resp = jq_compile(input, filter, String::new()); - let expected = json!({ - "namespace": "refui", - "key": "key3", - "scope": "account", - "value": "value2" - }); - assert_eq!(resp.unwrap(), expected); - - let filter = "if .success and ( .supportedHDCPVersion | contains(\"2.2\")) then {\"hdcp2.2\": true} elif .success and ( .supportedHDCPVersion | contains(\"1.4\")) then {\"hdcp1.4\": true} else {\"code\": -32100, \"message\": \"couldn't get version\"} end"; - let input = json!({ - "supportedHDCPVersion":"2.2", - "isHDCPSupported":true, - "success":true - }); - let resp = jq_compile(input, filter, String::new()); - let expected = json!({ - "hdcp2.2": true - }); - assert_eq!(resp.unwrap(), expected); - - let filter = "if .success then (.value | fromjson | .value) else { \"code\": -32100, \"message\": \"couldn't get language\" } end"; - let input = json!({ - "value": "{\"update_time\":\"2024-07-26T23:39:57.831726080Z\",\"value\":\"EN\"}", - "success":true - }); - let resp = jq_compile(input, filter, String::new()); - assert_eq!(resp.unwrap(), "EN".to_string()); + if v == Val::Null { + debug!( + "jq processing returned null for jq_rule={}, input {:?} , reference={}", + filter, input, reference + ); + //return Err(JqError::InvalidData); + } + Ok(Value::from(v)) + } + Err(e) => { + debug!("Encountered primtive value in jq_rule={}, input {:?} , reference={}, error={}. Returning value {}", filter, input, reference,e,input); + Ok(input) + } + }, + None => { + error!( + "Ripple Gateway Rule Processing Time: {},{}", + reference, + Utc::now().timestamp_millis() - start + ); + Err(JqError::RuleFailedToProcess(reference)) + } } } diff --git a/core/main/src/firebolt/handlers/device_rpc.rs b/core/main/src/firebolt/handlers/device_rpc.rs index fe54c0d27..94c072e9e 100644 --- a/core/main/src/firebolt/handlers/device_rpc.rs +++ b/core/main/src/firebolt/handlers/device_rpc.rs @@ -67,25 +67,6 @@ use ripple_sdk::{ include!(concat!(env!("OUT_DIR"), "/version.rs")); pub const DEVICE_UID: &str = "device.uid"; -// #[derive(Serialize, Clone, Debug, Deserialize)] -// #[serde(rename_all = "camelCase")] -// pub struct ProvisionRequest { -// account_id: String, -// device_id: String, -// distributor_id: Option, -// } - -// impl ProvisionRequest { -// fn get_session(self) -> DistributorSession { -// DistributorSession { -// id: None, -// token: None, -// account_id: Some(self.account_id.clone()), -// device_id: Some(self.device_id.clone()), -// } -// } -// } - #[rpc(server)] pub trait Device { #[method(name = "device.name")] diff --git a/core/main/src/firebolt/handlers/mod.rs b/core/main/src/firebolt/handlers/mod.rs new file mode 100644 index 000000000..a913c7975 --- /dev/null +++ b/core/main/src/firebolt/handlers/mod.rs @@ -0,0 +1,5 @@ +pub mod accessory_rpc; +pub mod advertising_rpc; +pub mod capabilities_rpc; +pub mod device_rpc; +pub mod privacy_rpc; diff --git a/core/main/src/lib.rs b/core/main/src/lib.rs new file mode 100644 index 000000000..fc6a2426b --- /dev/null +++ b/core/main/src/lib.rs @@ -0,0 +1,55 @@ +pub mod broker { + pub mod broker_utils; + pub mod endpoint_broker; + pub mod http_broker; + pub mod rules_engine; + pub mod thunder; + pub mod thunder_broker; + pub mod websocket_broker; +} + +pub mod utils { + pub mod router_utils; + pub mod rpc_utils; + #[cfg(test)] + pub mod test_utils; +} +pub mod state { + pub mod bootstrap_state; + pub mod extn_state; + pub mod metrics_state; + pub mod openrpc_state; + pub mod platform_state; + pub mod ripple_cache; + pub mod session_state; + pub mod cap { + pub mod cap_state; + pub mod generic_cap_state; + pub mod permitted_state; + } +} +pub mod firebolt { + pub mod firebolt_gatekeeper; + pub mod firebolt_gateway; + pub mod firebolt_ws; + pub mod handlers; + pub mod rpc; + pub mod rpc_router; +} + +pub mod service { + pub mod extn { + pub mod ripple_client; + } + pub mod apps; + pub mod data_governance; + pub mod telemetry_builder; + pub mod user_grants; +} +pub mod bootstrap { + pub mod manifest; +} +pub mod processor { + pub mod metrics_processor; + pub mod storage; +} diff --git a/core/main/src/processor/metrics_processor.rs b/core/main/src/processor/metrics_processor.rs index 60d15d9cc..50bf896ec 100644 --- a/core/main/src/processor/metrics_processor.rs +++ b/core/main/src/processor/metrics_processor.rs @@ -42,8 +42,8 @@ use ripple_sdk::{ use crate::{ service::{data_governance::DataGovernance, telemetry_builder::TelemetryBuilder}, state::platform_state::PlatformState, - SEMVER_LIGHTWEIGHT, }; +include!(concat!(env!("OUT_DIR"), "/version.rs")); pub async fn send_metric( platform_state: &PlatformState, diff --git a/core/main/src/service/apps/delegated_launcher_handler.rs b/core/main/src/service/apps/delegated_launcher_handler.rs index eeaa264cc..c4559f76b 100644 --- a/core/main/src/service/apps/delegated_launcher_handler.rs +++ b/core/main/src/service/apps/delegated_launcher_handler.rs @@ -72,7 +72,7 @@ use ripple_sdk::{ tokio::{self, sync::mpsc::Receiver}, }; use serde_json::{json, Value}; - +include!(concat!(env!("OUT_DIR"), "/version.rs")); use crate::{ processor::metrics_processor::send_metric_for_app_state_change, service::{ @@ -90,7 +90,6 @@ use crate::{ session_state::{PendingSessionInfo, Session}, }, utils::rpc_utils::rpc_await_oneshot, - SEMVER_LIGHTWEIGHT, }; const APP_ID_TITLE_FILE_NAME: &str = "appInfo.json"; diff --git a/core/main/src/service/extn/ripple_client.rs b/core/main/src/service/extn/ripple_client.rs index d65e8c6e7..451aa5074 100644 --- a/core/main/src/service/extn/ripple_client.rs +++ b/core/main/src/service/extn/ripple_client.rs @@ -55,11 +55,12 @@ use crate::{ /// /// # Examples /// ``` -/// use crate::firebolt::firebolt_gateway::FireboltGatewayCommand; -/// fn send_gateway_command(msg: FireboltGatewayCommand) { -/// let client = RippleClient::new(); -/// client.send_gateway_command() -/// } +/// //use main::firebolt::firebolt_gateway::FireboltGatewayCommand; +/// //use main::service::extn::ripple_client::RippleClient; +/// //fn send_gateway_command(msg: FireboltGatewayCommand) { +/// // let client = RippleClient::new(); +/// //let _ = client.send_gateway_command(); +/// //} /// /// ``` #[derive(Debug, Clone)] @@ -69,6 +70,12 @@ pub struct RippleClient { app_mgr_sender: Sender, // will be used by LCM RPC broker_sender: Sender, } +impl Default for RippleClient { + fn default() -> Self { + let cs = ChannelsState::new(); + RippleClient::new(cs) + } +} impl RippleClient { pub fn new(state: ChannelsState) -> RippleClient { diff --git a/core/main/src/service/user_grants.rs b/core/main/src/service/user_grants.rs index ed0f9f7f3..f49f60401 100644 --- a/core/main/src/service/user_grants.rs +++ b/core/main/src/service/user_grants.rs @@ -77,6 +77,21 @@ pub struct GrantState { grant_app_map: GrantAppMap, caps_needing_grants: Vec, } +impl Default for GrantState { + fn default() -> Self { + GrantState { + device_grants: Arc::new(RwLock::new(FileStore::new( + "device_grants".into(), + HashSet::new(), + ))), + grant_app_map: Arc::new(RwLock::new(FileStore::new( + "app_grants".into(), + HashMap::new(), + ))), + caps_needing_grants: Vec::new(), + } + } +} impl GrantState { pub fn new(manifest: DeviceManifest) -> GrantState { diff --git a/core/main/src/state/cap/cap_state.rs b/core/main/src/state/cap/cap_state.rs index e46edc002..7793faf33 100644 --- a/core/main/src/state/cap/cap_state.rs +++ b/core/main/src/state/cap/cap_state.rs @@ -55,6 +55,16 @@ pub struct CapState { primed_listeners: Arc>>, pub grant_state: GrantState, } +impl Default for CapState { + fn default() -> Self { + CapState { + generic: GenericCapState::default(), + permitted_state: PermittedState::default(), + primed_listeners: Arc::new(RwLock::new(HashSet::new())), + grant_state: GrantState::default(), + } + } +} impl CapState { pub fn new(manifest: DeviceManifest) -> Self { diff --git a/core/main/src/state/cap/permitted_state.rs b/core/main/src/state/cap/permitted_state.rs index 2f59d0a2d..ab19fc887 100644 --- a/core/main/src/state/cap/permitted_state.rs +++ b/core/main/src/state/cap/permitted_state.rs @@ -50,6 +50,13 @@ type FireboltPermissionStore = Arc Self { + PermittedState { + permitted: Arc::new(RwLock::new(FileStore::new("".to_string(), HashMap::new()))), + } + } +} impl PermittedState { pub fn new(manifest: DeviceManifest) -> PermittedState { diff --git a/core/main/src/state/openrpc_state.rs b/core/main/src/state/openrpc_state.rs index 633285a6c..6bb5e942f 100644 --- a/core/main/src/state/openrpc_state.rs +++ b/core/main/src/state/openrpc_state.rs @@ -199,6 +199,20 @@ pub struct OpenRpcState { provider_relation_map: Arc>>, openrpc_validator: Arc>, } +impl Default for OpenRpcState { + fn default() -> Self { + OpenRpcState { + open_rpc: FireboltOpenRpc::default(), + exclusory: None, + firebolt_cap_map: Arc::new(RwLock::new(HashMap::default())), + ripple_cap_map: Arc::new(RwLock::new(HashMap::default())), + cap_policies: Arc::new(RwLock::new(HashMap::default())), + extended_rpc: Arc::new(RwLock::new(Vec::new())), + provider_relation_map: Arc::new(RwLock::new(HashMap::default())), + openrpc_validator: Arc::new(RwLock::new(FireboltOpenRpcValidator::default())), + } + } +} impl OpenRpcState { fn load_additional_rpc(rpc: &mut FireboltOpenRpc, file_contents: &'static str) { diff --git a/core/main/src/state/platform_state.rs b/core/main/src/state/platform_state.rs index c0a9171e1..9c6491eb3 100644 --- a/core/main/src/state/platform_state.rs +++ b/core/main/src/state/platform_state.rs @@ -56,10 +56,11 @@ use super::{ /// /// # Examples /// ``` -/// let state = PlatformState::default(); +/// //use core::state::platform_state::PlatformState; +/// //let state = PlatformState::default(); /// -/// let manifest = state.get_device_manifest(); -/// println!("{}", manifest.unwrap().configuration.platform); +/// //let manifest = state.get_device_manifest(); +/// //println!("{}", manifest.unwrap().configuration.platform); /// ``` /// @@ -88,7 +89,7 @@ impl From for DeviceSessionIdentifier { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct PlatformState { extn_manifest: ExtnManifest, device_manifest: DeviceManifest, diff --git a/core/main/src/state/session_state.rs b/core/main/src/state/session_state.rs index a9bf28d95..6fca4ce87 100644 --- a/core/main/src/state/session_state.rs +++ b/core/main/src/state/session_state.rs @@ -83,8 +83,9 @@ impl Session { /// /// ### To add an App Id /// ``` +/// use main::state::session_state::SessionState; /// let session_state = SessionState::default(); -/// session_state("1234-1234".into(), "SomeCoolAppId".into()); +/// /// ``` #[derive(Debug, Clone, Default)] diff --git a/core/sdk/src/api/gateway/rpc_gateway_api.rs b/core/sdk/src/api/gateway/rpc_gateway_api.rs index bb8b87a0e..b7cfb6c58 100644 --- a/core/sdk/src/api/gateway/rpc_gateway_api.rs +++ b/core/sdk/src/api/gateway/rpc_gateway_api.rs @@ -53,7 +53,7 @@ impl From for AppIdentification { } } -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] pub struct CallContext { pub session_id: String, pub request_id: String, @@ -64,6 +64,20 @@ pub struct CallContext { pub cid: Option, pub gateway_secure: bool, } +// impl Default for CallContext { +// fn default() -> Self { +// CallContext { +// session_id: String::default(), +// request_id: String::default(), +// app_id: String::default(), +// call_id: u64::default(), +// protocol: ApiProtocol::default(), +// method: String::default(), +// cid: Option::default(), +// gateway_secure: bool::default(), +// } +// } +// } impl CallContext { // TODO: refactor this to use less arguments @@ -113,10 +127,11 @@ impl crate::Mockable for CallContext { } } -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Default)] pub enum ApiProtocol { Bridge, Extn, + #[default] JsonRpc, } @@ -201,6 +216,18 @@ pub struct JsonRpcApiResponse { #[serde(skip_serializing)] pub params: Option, } +impl Default for JsonRpcApiResponse { + fn default() -> Self { + JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + result: None, + id: None, + error: None, + method: None, + params: None, + } + } +} impl crate::Mockable for JsonRpcApiResponse { fn mock() -> Self { @@ -221,6 +248,15 @@ pub struct RpcRequest { pub params_json: String, pub ctx: CallContext, } +impl Default for RpcRequest { + fn default() -> Self { + RpcRequest { + method: "".to_string(), + params_json: "".to_string(), + ctx: CallContext::default(), + } + } +} impl ExtnPayloadProvider for RpcRequest { fn get_extn_payload(&self) -> ExtnPayload { diff --git a/core/sdk/src/extn/extn_client_message.rs b/core/sdk/src/extn/extn_client_message.rs index d66926eb6..6fa249180 100644 --- a/core/sdk/src/extn/extn_client_message.rs +++ b/core/sdk/src/extn/extn_client_message.rs @@ -101,6 +101,19 @@ pub struct ExtnMessage { pub callback: Option>, pub ts: Option, } +impl Default for ExtnMessage { + fn default() -> Self { + ExtnMessage { + id: "".to_string(), + requestor: ExtnId::get_main_target("main".into()), + target: RippleContract::Internal, + target_id: None, + payload: ExtnPayload::Request(ExtnRequest::Config(Config::DefaultName)), + callback: None, + ts: None, + } + } +} impl ExtnMessage { /// This method can be used to create [ExtnResponse] payload message from a given [ExtnRequest] diff --git a/openrpc_validator/src/lib.rs b/openrpc_validator/src/lib.rs index 291b1f81f..910442c07 100644 --- a/openrpc_validator/src/lib.rs +++ b/openrpc_validator/src/lib.rs @@ -6,7 +6,7 @@ use serde_json::{json, Value}; pub extern crate jsonschema; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, Default)] pub struct FireboltOpenRpc { pub apis: HashMap, }