Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: thunderBroker migration changes with config switch #675

Open
wants to merge 74 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
927bcec
feat: thunderBroker migration changes with config switch
nnaveen979 Nov 7, 2024
5ac3fba
fix: compilation warnings fixed.
nnaveen979 Nov 8, 2024
6dc75b7
chore: added new field calledd "use_thunderbroker" to use as switch b…
nnaveen979 Nov 8, 2024
84fd20d
feat : thunderbroker migration changes added, no compilation errors.
nnaveen979 Nov 8, 2024
2669a4b
fix: clippy errors fixed
nnaveen979 Nov 11, 2024
b6d055b
fix: clippy errors fixed
nnaveen979 Nov 11, 2024
5558256
fix: clippy error fixed
nnaveen979 Nov 11, 2024
9bbc431
chore: deleted the unnecessary file "thunder_client2.rs"
nnaveen979 Nov 12, 2024
8e6ccd9
fix: modified the thunder_client::manage() method
nnaveen979 Nov 13, 2024
7a51c9a
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 14, 2024
6566a42
feat: updated the method of ThunderClientManager::manage()
nnaveen979 Nov 14, 2024
de029ae
chore: added await method for one shot channel communication method
nnaveen979 Nov 15, 2024
1fccdaa
fix: modified the get_device_resp_msg() method.
nnaveen979 Nov 15, 2024
88ff286
chore: passing valid url for websocket connection
nnaveen979 Nov 18, 2024
b52500f
feat: added the src code to get the platform params for thunderbroker…
nnaveen979 Nov 19, 2024
620e007
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 22, 2024
932037c
feat: cleanup fn get_protocol_endpoint() in RuleEngine
nnaveen979 Nov 25, 2024
fff0199
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 25, 2024
1e7c2c9
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 26, 2024
cdb8f31
fix: clippy warning fixed
nnaveen979 Nov 26, 2024
be29008
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Nov 29, 2024
84ad404
fix: modified the thunderbroker flag check based on the config file
nnaveen979 Dec 3, 2024
5c31852
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 3, 2024
6747a42
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 6, 2024
21cb6f4
feat: modified the broker subscription handling
nnaveen979 Dec 23, 2024
ab0b10a
fix: fixed the clippy errors
nnaveen979 Dec 24, 2024
e077c77
Merge branch 'main' into Rppl_2681_new
nnaveen979 Dec 24, 2024
0ec2df8
fix: clippy warnings fixed, unit tests added thunder_async_client
nnaveen979 Jan 2, 2025
a403a55
chore:Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 2, 2025
3b48052
fix: fixed review comments
nnaveen979 Jan 7, 2025
c806670
Trigger Concourse
nnaveen979 Jan 7, 2025
ed7270c
fix: review comments fixed.
nnaveen979 Jan 8, 2025
b5e9c9e
fix: format check failure fix
nnaveen979 Jan 8, 2025
b5cbe77
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 8, 2025
5664535
chore: renaming, cleanup done for better readability.
nnaveen979 Jan 8, 2025
2609d4d
chore: clippy error fixed
nnaveen979 Jan 8, 2025
26588b6
fix: review comment fixed
nnaveen979 Jan 8, 2025
7242e40
fix: clippy error fixed
nnaveen979 Jan 8, 2025
1b7016d
fix: review comment fixed.
nnaveen979 Jan 9, 2025
f6e1acf
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 9, 2025
073ab20
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 13, 2025
c345b4c
chore: flag and one file name renamed for better readability.
nnaveen979 Jan 15, 2025
8d7771b
fix: format check failure fix
nnaveen979 Jan 15, 2025
58849cd
feat: added the new websocket for new thunder request in thunder_asyn…
nnaveen979 Jan 15, 2025
08a7c3b
fix: review comments fixed.
nnaveen979 Jan 16, 2025
d8ba964
fix: review comment fixed.
nnaveen979 Jan 17, 2025
a274ac7
fix: review comment fixed.
nnaveen979 Jan 17, 2025
543c5ac
Fixed status manager repsonse handling.
pahearn73 Jan 17, 2025
5869296
Removed debug
pahearn73 Jan 17, 2025
aa9129f
chore: renaming done for thunder async related variables structs for …
nnaveen979 Jan 21, 2025
bdf2afb
fix: device_operator file directory changed.moved into thunder_ripple…
nnaveen979 Jan 21, 2025
3a0b12d
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 23, 2025
3039eed
fix: added handling for notification receiving in thunder_async_clien…
nnaveen979 Jan 23, 2025
a559862
fix: review comment fixed related to websocket routing based on reque…
nnaveen979 Jan 24, 2025
28600fb
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 24, 2025
707c755
PCA: Changes to reduce boilerplate
pahearn73 Jan 24, 2025
a02a974
fix: added the websocket close call
nnaveen979 Jan 24, 2025
9439424
chore: cleanup of unused functions
nnaveen979 Jan 27, 2025
ec177fe
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 27, 2025
149ba9c
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Jan 28, 2025
b66b9a9
fix: try to add websocket disconnection handling changes
nnaveen979 Jan 29, 2025
1617246
Revert "fix: try to add websocket disconnection handling changes"
pahearn73 Jan 30, 2025
3599e40
ThunderAsyncClient: Added reconnect/resubscribe logic.
pahearn73 Jan 31, 2025
0c53700
ThunderAsyncClient: Attempt to resolve blackduck failure.
pahearn73 Jan 31, 2025
d28ec45
fix: review comments of single web socket usage for thunderasyncclient
nnaveen979 Feb 4, 2025
63a84b4
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Feb 4, 2025
a82f809
chore:Merge branch 'main' into Rppl_2681_new
nnaveen979 Feb 5, 2025
ca9a7fa
fix: format check failure fix
nnaveen979 Feb 5, 2025
614bd09
fix: avoid unnecessary unwrap()
nnaveen979 Feb 5, 2025
5034ae9
chore: Merge branch 'main' into Rppl_2681_new
nnaveen979 Feb 12, 2025
ca867b8
fix: renamed the socket tx, rx channel names, prepare_request fn now …
nnaveen979 Feb 12, 2025
c606f58
fix: fn name updated in test case
nnaveen979 Feb 12, 2025
41bb106
ThunderAsyncClient: Fixed request encoding
pahearn73 Feb 12, 2025
f0b5418
ThunderAsyncClient: Clippy
pahearn73 Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: review comments fixed.
nnaveen979 committed Jan 8, 2025
commit ed7270cbf3b4fb42ec99f4f98e6e6dde30d72362
5 changes: 4 additions & 1 deletion core/main/src/processor/metrics_processor.rs
Original file line number Diff line number Diff line change
@@ -214,7 +214,10 @@ impl ExtnRequestProcessor for MetricsProcessor {
}
}
MetricsPayload::TelemetryPayload(t) => {
TelemetryBuilder::update_session_id_and_send_telemetry(&state, t).is_ok()
match TelemetryBuilder::update_session_id_and_send_telemetry(&state, t) {
Ok(_) => Self::ack(client, msg).await.is_ok(),
Err(e) => Self::handle_error(client, msg, e).await,
}
}
MetricsPayload::OperationalMetric(_) => true,
}
41 changes: 41 additions & 0 deletions core/sdk/src/api/device/device_operator.rs
Original file line number Diff line number Diff line change
@@ -38,6 +38,47 @@ pub trait DeviceOperator: Clone {

async fn unsubscribe(&self, request: DeviceUnsubscribeRequest);
}

#[derive(Debug, Clone)]
pub struct DeviceResponseSubscription {
pub sub_id: Option<String>,
pub handlers: Vec<tokio::sync::mpsc::Sender<DeviceResponseMessage>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeviceChannelRequest {
Call(DeviceCallRequest),
Subscribe(DeviceSubscribeRequest),
Unsubscribe(DeviceUnsubscribeRequest),
}

impl DeviceChannelRequest {
pub fn get_callsign_method(&self) -> (String, String) {
nnaveen979 marked this conversation as resolved.
Show resolved Hide resolved
match self {
DeviceChannelRequest::Call(c) => {
let mut collection: Vec<&str> = c.method.split('.').collect();
let method = collection.pop().unwrap_or_default();
let callsign = collection.join(".");
(callsign, method.into())
}
DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()),
DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()),
}
}

pub fn is_subscription(&self) -> bool {
!matches!(self, DeviceChannelRequest::Call(_))
}

pub fn is_unsubscribe(&self) -> Option<DeviceUnsubscribeRequest> {
if let DeviceChannelRequest::Unsubscribe(u) = self {
Some(u.clone())
} else {
None
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceCallRequest {
pub method: String,
2 changes: 1 addition & 1 deletion device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
Original file line number Diff line number Diff line change
@@ -84,11 +84,11 @@
if let Ok(host_override) = std::env::var("DEVICE_HOST") {
gateway_url.set_host(Some(&host_override)).ok();
}
}

Check warning on line 87 in device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs

GitHub Actions / Format checker

Diff in /home/runner/work/Ripple/Ripple/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs
}

if let Ok(thndr_client) =
ThunderClientBuilder::get_client(gateway_url.clone(), None, None, None, None, true)
ThunderClientBuilder::start_thunder_client(gateway_url.clone(), None, None, None, None, true)
.await
{
let thunder_state = ThunderState::new(ext_client.clone(), thndr_client);
46 changes: 1 addition & 45 deletions device/thunder_ripple_sdk/src/client/thunder_async_client.rs
Original file line number Diff line number Diff line change
@@ -23,10 +23,7 @@ use futures_util::{SinkExt, StreamExt};
use ripple_sdk::api::device::device_operator::DeviceResponseMessage;
use ripple_sdk::{
api::{
device::device_operator::{
DeviceCallRequest, DeviceSubscribeRequest, DeviceUnsubscribeRequest,
},
gateway::rpc_gateway_api::JsonRpcApiResponse,
device::device_operator::DeviceChannelRequest, gateway::rpc_gateway_api::JsonRpcApiResponse,
},
log::{debug, error, info},
tokio::{self, net::TcpStream, sync::mpsc::Receiver},
@@ -35,50 +32,9 @@ use ripple_sdk::{
rpc_utils::{extract_tcp_port, get_next_id},
},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio_tungstenite::{client_async, tungstenite::Message, WebSocketStream};

#[derive(Debug, Clone)]
pub struct DeviceResponseSubscription {
pub sub_id: Option<String>,
pub handlers: Vec<tokio::sync::mpsc::Sender<DeviceResponseMessage>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeviceChannelRequest {
Call(DeviceCallRequest),
Subscribe(DeviceSubscribeRequest),
Unsubscribe(DeviceUnsubscribeRequest),
}

impl DeviceChannelRequest {
pub fn get_callsign_method(&self) -> (String, String) {
match self {
DeviceChannelRequest::Call(c) => {
let mut collection: Vec<&str> = c.method.split('.').collect();
let method = collection.pop().unwrap_or_default();
let callsign = collection.join(".");
(callsign, method.into())
}
DeviceChannelRequest::Subscribe(s) => (s.module.clone(), s.event_name.clone()),
DeviceChannelRequest::Unsubscribe(u) => (u.module.clone(), u.event_name.clone()),
}
}

pub fn is_subscription(&self) -> bool {
!matches!(self, DeviceChannelRequest::Call(_))
}

pub fn is_unsubscribe(&self) -> Option<DeviceUnsubscribeRequest> {
if let DeviceChannelRequest::Unsubscribe(u) = self {
Some(u.clone())
} else {
None
}
}
}

#[derive(Clone, Debug)]
pub struct ThunderAsyncClient {
status_manager: StatusManager,
5 changes: 3 additions & 2 deletions device/thunder_ripple_sdk/src/client/thunder_client.rs
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ use super::{
jsonrpc_method_locator::JsonRpcMethodLocator,
plugin_manager::{PluginActivatedResult, PluginManagerCommand},
};
use crate::client::thunder_async_client::{DeviceChannelRequest, DeviceResponseSubscription};
use ripple_sdk::api::device::device_operator::{DeviceChannelRequest, DeviceResponseSubscription};
use ripple_sdk::tokio::sync::mpsc::Receiver;
use std::sync::RwLock;
use std::{env, process::Command};
@@ -626,6 +626,7 @@ impl ThunderClientBuilder {
}
None
}

async fn create_client(
url: Url,
thunder_connection_state: Arc<ThunderConnectionState>,
@@ -675,7 +676,7 @@ impl ThunderClientBuilder {
client
}

pub async fn get_client(
pub async fn start_thunder_client(
url: Url,
plugin_manager_tx: Option<MpscSender<PluginManagerCommand>>,
pool_tx: Option<mpsc::Sender<ThunderPoolCommand>>,
4 changes: 2 additions & 2 deletions device/thunder_ripple_sdk/src/client/thunder_client_pool.rs
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ impl ThunderClientPool {
let thunder_connection_state = thunder_connection_state.clone();
let mut clients = Vec::default();
for _ in 0..size {
let client = ThunderClientBuilder::get_client(
let client = ThunderClientBuilder::start_thunder_client(
url.clone(),
plugin_manager_tx.clone(),
Some(s.clone()),
@@ -131,7 +131,7 @@ impl ThunderClientPool {
let mut itr = pool.clients.iter();
let i = itr.position(|x| x.client.id == client_id);
if let Some(index) = i {
let client = ThunderClientBuilder::get_client(
let client = ThunderClientBuilder::start_thunder_client(
url.clone(),
plugin_manager_tx.clone(),
Some(sender_for_thread.clone()),