Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: thunderBroker migration changes with config switch #675

Open
wants to merge 61 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
927bcec
feat: thunderBroker migration changes with config switch
nnaveen979 Nov 7, 2024
5ac3fba
fix: compilation warnings fixed.
nnaveen979 Nov 8, 2024
6dc75b7
chore: added new field calledd "use_thunderbroker" to use as switch b…
nnaveen979 Nov 8, 2024
84fd20d
feat : thunderbroker migration changes added, no compilation errors.
nnaveen979 Nov 8, 2024
2669a4b
fix: clippy errors fixed
nnaveen979 Nov 11, 2024
b6d055b
fix: clippy errors fixed
nnaveen979 Nov 11, 2024
5558256
fix: clippy error fixed
nnaveen979 Nov 11, 2024
9bbc431
chore: deleted the unnecessary file "thunder_client2.rs"
nnaveen979 Nov 12, 2024
8e6ccd9
fix: modified the thunder_client::manage() method
nnaveen979 Nov 13, 2024
7a51c9a
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 14, 2024
6566a42
feat: updated the method of ThunderClientManager::manage()
nnaveen979 Nov 14, 2024
de029ae
chore: added await method for one shot channel communication method
nnaveen979 Nov 15, 2024
1fccdaa
fix: modified the get_device_resp_msg() method.
nnaveen979 Nov 15, 2024
88ff286
chore: passing valid url for websocket connection
nnaveen979 Nov 18, 2024
b52500f
feat: added the src code to get the platform params for thunderbroker…
nnaveen979 Nov 19, 2024
620e007
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 22, 2024
932037c
feat: cleanup fn get_protocol_endpoint() in RuleEngine
nnaveen979 Nov 25, 2024
fff0199
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 25, 2024
1e7c2c9
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 26, 2024
cdb8f31
fix: clippy warning fixed
nnaveen979 Nov 26, 2024
be29008
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 29, 2024
84ad404
fix: modified the thunderbroker flag check based on the config file
nnaveen979 Dec 3, 2024
5c31852
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 3, 2024
6747a42
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 6, 2024
21cb6f4
feat: modified the broker subscription handling
nnaveen979 Dec 23, 2024
ab0b10a
fix: fixed the clippy errors
nnaveen979 Dec 24, 2024
e077c77
Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 24, 2024
0ec2df8
fix: clippy warnings fixed, unit tests added thunder_async_client
nnaveen979 Jan 2, 2025
a403a55
chore:Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 2, 2025
3b48052
fix: fixed review comments
nnaveen979 Jan 7, 2025
c806670
Trigger Concourse
nnaveen979 Jan 7, 2025
ed7270c
fix: review comments fixed.
nnaveen979 Jan 8, 2025
b5e9c9e
fix: format check failure fix
nnaveen979 Jan 8, 2025
b5cbe77
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 8, 2025
5664535
chore: renaming, cleanup done for better readability.
nnaveen979 Jan 8, 2025
2609d4d
chore: clippy error fixed
nnaveen979 Jan 8, 2025
26588b6
fix: review comment fixed
nnaveen979 Jan 8, 2025
7242e40
fix: clippy error fixed
nnaveen979 Jan 8, 2025
1b7016d
fix: review comment fixed.
nnaveen979 Jan 9, 2025
f6e1acf
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 9, 2025
073ab20
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 13, 2025
c345b4c
chore: flag and one file name renamed for better readability.
nnaveen979 Jan 15, 2025
8d7771b
fix: format check failure fix
nnaveen979 Jan 15, 2025
58849cd
feat: added the new websocket for new thunder request in thunder_asyn…
nnaveen979 Jan 15, 2025
08a7c3b
fix: review comments fixed.
nnaveen979 Jan 16, 2025
d8ba964
fix: review comment fixed.
nnaveen979 Jan 17, 2025
a274ac7
fix: review comment fixed.
nnaveen979 Jan 17, 2025
543c5ac
Fixed status manager repsonse handling.
pahearn73 Jan 17, 2025
5869296
Removed debug
pahearn73 Jan 17, 2025
aa9129f
chore: renaming done for thunder async related variables structs for …
nnaveen979 Jan 21, 2025
bdf2afb
fix: device_operator file directory changed.moved into thunder_ripple…
nnaveen979 Jan 21, 2025
3a0b12d
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 23, 2025
3039eed
fix: added handling for notification receiving in thunder_async_clien…
nnaveen979 Jan 23, 2025
a559862
fix: review comment fixed related to websocket routing based on reque…
nnaveen979 Jan 24, 2025
28600fb
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 24, 2025
707c755
PCA: Changes to reduce boilerplate
pahearn73 Jan 24, 2025
a02a974
fix: added the websocket close call
nnaveen979 Jan 24, 2025
9439424
chore: cleanup of unused functions
nnaveen979 Jan 27, 2025
ec177fe
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 27, 2025
149ba9c
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 28, 2025
b66b9a9
fix: try to add websocket disconnection handling changes
nnaveen979 Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// SPDX-License-Identifier: Apache-2.0
//

use crate::{state::platform_state::PlatformState, utils::rpc_utils::extract_tcp_port};
use crate::state::platform_state::PlatformState;
use futures::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use jsonrpsee::core::RpcResult;
use ripple_sdk::{
api::gateway::rpc_gateway_api::{JsonRpcApiError, RpcRequest},
log::{error, info},
tokio::{self, net::TcpStream},
utils::rpc_utils::extract_tcp_port,
};
use serde_json::Value;
use std::time::Duration;
Expand All @@ -46,12 +47,14 @@ impl BrokerUtils {
};
let url = url::Url::parse(&url_path).unwrap();
let port = extract_tcp_port(endpoint);
let tcp_port = port.unwrap();

info!("Url host str {}", url.host_str().unwrap());
let mut index = 0;

loop {
// Try connecting to the tcp port first
if let Ok(v) = TcpStream::connect(&port).await {
if let Ok(v) = TcpStream::connect(&tcp_port).await {
// Setup handshake for websocket with the tcp port
// Some WS servers lock on to the Port but not setup handshake till they are fully setup
if let Ok((stream, _)) = client_async(url_path.clone(), v).await {
Expand All @@ -61,7 +64,7 @@ impl BrokerUtils {
if (index % 10).eq(&0) {
error!(
"Broker with {} failed with retry for last {} secs in {}",
url_path, index, port
url_path, index, tcp_port
);
}
index += 1;
Expand Down
9 changes: 4 additions & 5 deletions core/main/src/broker/thunder/user_data_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ use ripple_sdk::{
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

use crate::broker::endpoint_broker::{
self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState,
use crate::broker::{
endpoint_broker::{self, BrokerCallback, BrokerOutput, BrokerRequest, EndpointBrokerState},
rules_engine::{Rule, RuleTransformType},
thunder_broker::ThunderBroker,
};
use crate::broker::rules_engine::{Rule, RuleTransformType};

use futures::stream::SplitSink;
use futures_util::SinkExt;

use crate::broker::thunder_broker::ThunderBroker;
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};

// TBD get the storage dir from manifest or other Ripple config file
Expand Down
10 changes: 0 additions & 10 deletions core/main/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,3 @@ pub fn get_base_method(method: &str) -> String {
let method_vec: Vec<&str> = method.split('.').collect();
method_vec.first().unwrap().to_string().to_lowercase()
}

pub fn extract_tcp_port(url: &str) -> String {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
domain_split.first().unwrap().to_string()
} else {
url.to_owned()
}
}
1 change: 0 additions & 1 deletion core/sdk/src/api/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod device_apps;
pub mod device_browser;
pub mod device_events;
pub mod device_info_request;
pub mod device_operator;
pub mod device_peristence;
pub mod device_request;
pub mod device_user_grants_data;
Expand Down
10 changes: 10 additions & 0 deletions core/sdk/src/utils/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ use jsonrpsee::core::Error;
pub fn rpc_err(msg: impl Into<String>) -> Error {
Error::Custom(msg.into())
}

pub fn extract_tcp_port(url: &str) -> Result<String, Error> {
let url_split: Vec<&str> = url.split("://").collect();
if let Some(domain) = url_split.get(1) {
let domain_split: Vec<&str> = domain.split('/').collect();
Ok(domain_split.first().unwrap().to_string())
} else {
Err(Error::Custom("Invalid URL format".to_string()))
}
}
8 changes: 6 additions & 2 deletions device/thunder_ripple_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ regex.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "ws-client"] }
serde.workspace = true
url.workspace = true
serde_json.workspace = true
futures-channel.workspace = true
futures.workspace = true

strum = { version = "0.24", default-features = false }
strum_macros = "0.24"
Expand All @@ -56,8 +59,9 @@ csv = "1.1" # Allowing minor updates
home = { version = "=0.5.5", optional = true }
tree_magic_mini = { version = "=3.0.3", optional = true }
rstest = { version = "0.18.2", optional = true, default-features = false }
tokio-tungstenite = { workspace = true, features = ["handshake"] }
futures-util = { version = "0.3.28", features = ["sink", "std"], default-features = false}
pahearn73 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
tokio-tungstenite = { workspace = true, features = ["native-tls"] }
ripple_sdk = { path = "../../core/sdk", features = ["tdk"] }

rstest = "0.18.0"
109 changes: 102 additions & 7 deletions device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,120 @@ use crate::{
};
use ripple_sdk::{
extn::client::extn_client::ExtnClient,
log::{error, info},
log::{debug, error, info, warn},
};

use super::{get_config_step::ThunderGetConfigStep, setup_thunder_pool_step::ThunderPoolStep};
use crate::client::thunder_client::ThunderClientBuilder;
use crate::thunder_state::ThunderBootstrapStateWithConfig;
use crate::thunder_state::ThunderState;
use ripple_sdk::api::config::Config;
use ripple_sdk::utils::error::RippleError;
use serde::Deserialize;

use ripple_sdk::extn::extn_client_message::{ExtnMessage, ExtnResponse};

const GATEWAY_DEFAULT: &str = "ws://127.0.0.1:9998/jsonrpc";

#[derive(Deserialize, Clone)]
pub struct ThunderPlatformParams {
#[serde(default = "gateway_default")]
gateway: String,
}

fn gateway_default() -> String {
String::from(GATEWAY_DEFAULT)
}

pub async fn boot_thunder(
state: ExtnClient,
ext_client: ExtnClient,
plugin_param: ThunderPluginBootParam,
) -> Option<ThunderBootstrapStateWithClient> {
pahearn73 marked this conversation as resolved.
Show resolved Hide resolved
info!("Booting thunder");
if let Ok(state) = ThunderGetConfigStep::setup(state, plugin_param).await {
info!("Booting thunder initiated");
//by default enabling the thunderBroker
let state = if ext_client.get_bool_config("use_with_thunder_async_client") {
info!("Using thunder_async_clinet");
let mut extn_client = ext_client.clone();
let mut gateway_url = url::Url::parse(GATEWAY_DEFAULT).unwrap();
nnaveen979 marked this conversation as resolved.
Show resolved Hide resolved
let extn_message_response: Result<ExtnMessage, RippleError> =
extn_client.request(Config::PlatformParameters).await;

if let Ok(message) = extn_message_response {
nnaveen979 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(_response) = message.payload.extract().map(|response| {
if let ExtnResponse::Value(v) = response {
serde_json::from_value::<ThunderPlatformParams>(v)
.map(|thunder_parameters| {
url::Url::parse(&thunder_parameters.gateway).map_or_else(
|_| {
warn!(
"Could not parse thunder gateway '{}', using default {}",
thunder_parameters.gateway, GATEWAY_DEFAULT
);
},
|gtway_url| {
debug!("Got url from device manifest");
gateway_url = gtway_url;
},
);
})
.unwrap_or_else(|_| {
warn!(
"Could not read thunder platform parameters, using default {}",
GATEWAY_DEFAULT
);
});
}
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
gateway_url.set_host(Some(&host_override)).ok();
}
}) {}
}

if let Ok(thndr_client) = ThunderClientBuilder::start_thunder_client(
gateway_url.clone(),
None,
None,
None,
None,
true,
)
.await
{
let thunder_state = ThunderState::new(ext_client.clone(), thndr_client);

let thndr_boot_statecfg = ThunderBootstrapStateWithConfig {
extn_client: ext_client,
url: gateway_url,
pool_size: None,
plugin_param: None,
thunder_connection_state: None,
};

let thndr_boot_stateclient = ThunderBootstrapStateWithClient {
prev: thndr_boot_statecfg,
state: thunder_state,
};

thndr_boot_stateclient.clone().state.start_event_thread();

Some(thndr_boot_stateclient)
} else {
None
}
} else if let Ok(state) = ThunderGetConfigStep::setup(ext_client, plugin_param).await {
if let Ok(state) = ThunderPoolStep::setup(state).await {
SetupThunderProcessor::setup(state.clone()).await;
return Some(state);
Some(state)
} else {
error!("Unable to connect to Thunder, error in ThunderPoolStep");
None
}
} else {
error!("Unable to connect to Thunder, error in ThunderGetConfigStep");
None
nnaveen979 marked this conversation as resolved.
Show resolved Hide resolved
};

if let Some(s) = state.clone() {
SetupThunderProcessor::setup(s).await;
}
None
state
}
6 changes: 3 additions & 3 deletions device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ impl ThunderGetConfigStep {
return Ok(ThunderBootstrapStateWithConfig {
extn_client: state,
url: gateway_url,
pool_size,
plugin_param: expected_plugins,
thunder_connection_state: Arc::new(ThunderConnectionState::new()),
pool_size: Some(pool_size),
plugin_param: Some(expected_plugins),
thunder_connection_state: Some(Arc::new(ThunderConnectionState::new())),
});
}
}
Expand Down
23 changes: 16 additions & 7 deletions device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ impl ThunderPoolStep {
pub async fn setup(
state: ThunderBootstrapStateWithConfig,
) -> Result<ThunderBootstrapStateWithClient, RippleError> {
let pool_size = state.pool_size;
let url = state.url.clone();
let thunder_connection_state = state.thunder_connection_state.clone();
if pool_size < 2 {
warn!("Pool size of 1 is not recommended, there will be no dedicated connection for Controller events");
return Err(RippleError::BootstrapError);
}
let pool_size = match state.pool_size {
Some(s) => s,
None => {
warn!("Pool size of 1 is not recommended, there will be no dedicated connection for Controller events");
return Err(RippleError::BootstrapError);
}
};

let controller_pool = ripple_sdk::tokio::time::timeout(
Duration::from_secs(10),
ThunderClientPool::start(url.clone(), None, thunder_connection_state.clone(), 1),
Expand All @@ -68,7 +71,13 @@ impl ThunderPoolStep {
};

info!("Received Controller pool");
let expected_plugins = state.plugin_param.clone();
let expected_plugins = match state.plugin_param.clone() {
Some(plugins) => plugins,
None => {
error!("Expected plugins are not provided.");
return Err(RippleError::BootstrapError);
}
};
let tc = Box::new(controller_pool);
let (plugin_manager_tx, failed_plugins) =
PluginManager::start(tc, expected_plugins.clone()).await;
Expand Down Expand Up @@ -98,7 +107,7 @@ impl ThunderPoolStep {
}

let client = ThunderClientPool::start(
url,
url.clone(),
Some(plugin_manager_tx),
thunder_connection_state.clone(),
pool_size - 1,
Expand Down
Loading
Loading