Skip to content

Commit

Permalink
fix: subscribing same event not unsubscribing the previous one. (#556)
Browse files Browse the repository at this point in the history
* fix: RPPL-2126: subscribing same event not unsubscribing the previous one.
  • Loading branch information
sakshihcst authored Jul 2, 2024
1 parent a4d5907 commit 4983ff2
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 42 deletions.
25 changes: 24 additions & 1 deletion core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ pub struct BrokerSender {
pub struct BrokerRequest {
pub rpc: RpcRequest,
pub rule: Rule,
pub subscription_processed: Option<bool>,
}

impl BrokerRequest {
pub fn is_subscription_processed(&self) -> bool {
self.subscription_processed.is_some()
}
}

/// BrokerCallback will be used by the communication broker to send the firebolt response
Expand Down Expand Up @@ -180,6 +187,14 @@ impl EndpointBrokerState {
Ok(result)
}

fn update_unsubscribe_request(&self, id: u64) {
let mut result = self.request_map.write().unwrap();
if let Some(mut value) = result.remove(&id) {
value.subscription_processed = Some(true);
let _ = result.insert(id, value);
}
}

fn get_extn_message(&self, id: u64) -> Result<ExtnMessage, RippleError> {
let result = { self.extension_request_map.write().unwrap().remove(&id) };
match result {
Expand All @@ -204,6 +219,7 @@ impl EndpointBrokerState {
BrokerRequest {
rpc: rpc_request.clone(),
rule: rule.clone(),
subscription_processed: None,
},
);
}
Expand Down Expand Up @@ -306,6 +322,7 @@ impl EndpointBrokerState {
BrokerRequest {
rpc: rpc_request.clone(),
rule,
subscription_processed: None,
}
}
}
Expand Down Expand Up @@ -412,6 +429,7 @@ impl BrokerOutputForwarder {
};
if let Some(id) = id {
if let Ok(broker_request) = platform_state.endpoint_state.get_request(id) {
let sub_processed = broker_request.is_subscription_processed();
let rpc_request = broker_request.rpc;
let session_id = rpc_request.ctx.get_id();
let is_subscription = rpc_request.is_subscription();
Expand All @@ -437,10 +455,14 @@ impl BrokerOutputForwarder {
}
}
} else if is_subscription {
if sub_processed {
continue;
}
v.data.result = Some(json!({
"listening" : rpc_request.is_listening(),
"event" : rpc_request.ctx.method
}))
}));
platform_state.endpoint_state.update_unsubscribe_request(id);
} else if let Some(filter) = broker_request
.rule
.transform
Expand Down Expand Up @@ -535,6 +557,7 @@ mod tests {
transform: RuleTransform::default(),
endpoint: None,
},
subscription_processed: None,
},
RippleError::InvalidInput,
)
Expand Down
110 changes: 69 additions & 41 deletions core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::Duration,
vec,
};
use tokio_tungstenite::client_async;

#[derive(Debug, Clone)]
pub struct ThunderBroker {
sender: BrokerSender,
subscription_map: Arc<RwLock<HashMap<String, BrokerRequest>>>,
subscription_map: Arc<RwLock<HashMap<String, Vec<BrokerRequest>>>>,
}

impl ThunderBroker {
Expand Down Expand Up @@ -132,6 +133,40 @@ impl ThunderBroker {
let callsign = collection.join(".");
(callsign, method)
}

fn subscribe(&self, request: &BrokerRequest) -> Option<BrokerRequest> {
let mut sub_map = self.subscription_map.write().unwrap();
let app_id = &request.rpc.ctx.session_id;
let method = &request.rpc.ctx.method;
let listen = request.rpc.is_listening();
let mut response = None;
debug!(
"Initial subscription map of {:?} app_id {:?}",
sub_map, app_id
);

if let Some(mut v) = sub_map.remove(app_id) {
debug!("Subscription map after removing app {:?}", v);
if let Some(i) = v
.iter()
.position(|x| x.rpc.ctx.method.eq_ignore_ascii_case(method))
{
debug!(
"Removing subscription for method {} for app {}",
method, app_id
);
response = Some(v.remove(i));
//let _ = response.insert(v.remove(i));
}
if listen {
v.push(request.clone());
}
let _ = sub_map.insert(app_id.clone(), v);
} else {
let _ = sub_map.insert(app_id.clone(), vec![request.clone()]);
}
response
}
}

impl EndpointBroker for ThunderBroker {
Expand All @@ -150,55 +185,48 @@ impl EndpointBroker for ThunderBroker {
let mut requests = Vec::new();
let rpc = rpc_request.clone().rpc;
let id = rpc.ctx.call_id;
let app_id = rpc.ctx.app_id;
let (callsign, method) = Self::get_callsign_and_method_from_alias(&rpc_request.rule.alias);
if method.is_none() {
return Err(RippleError::InvalidInput);
}
let method = method.unwrap();
// Below chunk of code is basically for subscription where thunder needs some special care based on
// the JsonRpc specification
if rpc_request.rpc.is_subscription() {
let listen = rpc_request.rpc.is_listening();
let notif_id = {
let mut sub_map = self.subscription_map.write().unwrap();

if listen {
if let Some(cleanup) = sub_map.insert(app_id, rpc_request.clone()) {
requests.push(
json!({
"jsonrpc": "2.0",
"id": cleanup.rpc.ctx.call_id,
"method": format!("{}.{}", callsign, "unregister".to_owned()),
"params": {
"event": method,
"id": format!("{}", cleanup.rpc.ctx.call_id)
}
})
.to_string(),
)
}
id
} else if let Some(v) = sub_map.remove(&app_id) {
v.rpc.ctx.call_id
} else {
id
}
};
requests.push(
json!({
"jsonrpc": "2.0",
"id": id,
"method": format!("{}.{}", callsign, match listen {
true => "register",
false => "unregister"
}),
"params": json!({
"event": method,
"id": format!("{}", notif_id)
// If there was an existing app and method combo for the same subscription just unregister that
if let Some(cleanup) = self.subscribe(rpc_request) {
requests.push(
json!({
"jsonrpc": "2.0",
"id": cleanup.rpc.ctx.call_id,
"method": format!("{}.unregister", callsign),
"params": {
"event": method,
"id": format!("{}", cleanup.rpc.ctx.call_id)
}
})
})
.to_string(),
)
.to_string(),
)
}

// Given unregistration is already performed by previous step just do registration
if listen {
requests.push(
json!({
"jsonrpc": "2.0",
"id": id,
"method": format!("{}.register", callsign),
"params": json!({
"event": method,
"id": format!("{}", id)
})
})
.to_string(),
)
}
} else {
// Simple request and response handling
let request = Self::update_request(rpc_request)?;
requests.push(request)
}
Expand Down

0 comments on commit 4983ff2

Please sign in to comment.