From b8c3cdce6d84240f1fbd8b8e3c3c5db9623458f9 Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Mon, 28 Oct 2024 12:54:00 -0700 Subject: [PATCH 1/6] Check RPC version --- Cargo.lock | 2 +- livekit/src/room/mod.rs | 1 + .../src/room/participant/local_participant.rs | 43 +++++++++++-------- livekit/src/room/participant/rpc.rs | 2 + 4 files changed, 28 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c109ac0..31372d3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ dependencies = [ [[package]] name = "livekit-ffi" -version = "0.12.0" +version = "0.12.1" dependencies = [ "console-subscriber", "dashmap", diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 28d51902..4159f84e 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -703,6 +703,7 @@ impl RoomSession { method, payload, response_timeout_ms, + version, ) .await; } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 8c459676..a4a954d6 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -786,6 +786,7 @@ impl LocalParticipant { method: String, payload: String, response_timeout_ms: u32, + version: u32, ) { if let Err(e) = self .publish_rpc_ack(RpcAck { @@ -797,32 +798,36 @@ impl LocalParticipant { log::error!("Failed to publish RPC ACK: {:?}", e); } - let handler = self.local.rpc_state.lock().handlers.get(&method).cloned(); - let caller_identity_2 = caller_identity.clone(); let request_id_2 = request_id.clone(); - let response = match handler { - Some(handler) => { - match tokio::task::spawn(async move { - handler( - request_id.clone(), - caller_identity.clone(), - payload.clone(), - Duration::from_millis(response_timeout_ms as u64), - ) + let response = if version != 1 { + Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None)) + } else { + let handler = self.local.rpc_state.lock().handlers.get(&method).cloned(); + + match handler { + Some(handler) => { + match tokio::task::spawn(async move { + handler( + request_id.clone(), + caller_identity.clone(), + payload.clone(), + Duration::from_millis(response_timeout_ms as u64), + ) + .await + }) .await - }) - .await - { - Ok(result) => result, - Err(e) => { - log::error!("RPC method handler returned an error: {:?}", e); - Err(RpcError::built_in(RpcErrorCode::ApplicationError, None)) + { + Ok(result) => result, + Err(e) => { + log::error!("RPC method handler returned an error: {:?}", e); + Err(RpcError::built_in(RpcErrorCode::ApplicationError, None)) + } } } + None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)), } - None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)), }; let (payload, error) = match response { diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index 3ddb0f54..54ce7488 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -60,6 +60,7 @@ pub enum RpcErrorCode { RecipientNotFound = 1401, RequestPayloadTooLarge = 1402, UnsupportedServer = 1403, + UnsupportedVersion = 1404, } impl RpcErrorCode { @@ -76,6 +77,7 @@ impl RpcErrorCode { Self::RecipientNotFound => "Recipient not found", Self::RequestPayloadTooLarge => "Request payload too large", Self::UnsupportedServer => "RPC not supported by server", + Self::UnsupportedVersion => "Unsupported RPC version", } } } From 8a939a4e415563b882129190bd8171b96aa29eca Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Mon, 28 Oct 2024 13:20:07 -0700 Subject: [PATCH 2/6] invocation data, duration --- examples/rpc/src/main.rs | 55 ++++++++++--------- livekit-ffi/src/server/participant.rs | 21 +++---- livekit-ffi/src/server/requests.rs | 2 +- livekit/src/prelude.rs | 3 +- livekit/src/room/mod.rs | 2 +- .../src/room/participant/local_participant.rs | 30 ++++------ livekit/src/room/participant/rpc.rs | 16 ++++++ 7 files changed, 66 insertions(+), 63 deletions(-) diff --git a/examples/rpc/src/main.rs b/examples/rpc/src/main.rs index 152f380e..c778a454 100644 --- a/examples/rpc/src/main.rs +++ b/examples/rpc/src/main.rs @@ -80,13 +80,13 @@ async fn main() -> Result<(), Box> { async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: &Arc) { greeters_room.local_participant().register_rpc_method( "arrival".to_string(), - |_, caller_identity, payload, _| { + |data| { Box::pin(async move { println!( "[{}] [Greeter] Oh {} arrived and said \"{}\"", elapsed_time(), - caller_identity, - payload + data.caller_identity, + data.payload ); sleep(Duration::from_secs(2)).await; Ok("Welcome and have a wonderful day!".to_string()) @@ -94,38 +94,41 @@ async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: }, ); - math_genius_room.local_participant().register_rpc_method("square-root".to_string(), |_, caller_identity, payload, response_timeout_ms| { - Box::pin(async move { - let json_data: Value = serde_json::from_str(&payload).unwrap(); - let number = json_data["number"].as_f64().unwrap(); - println!( - "[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.", - elapsed_time(), - caller_identity, - number, - response_timeout_ms.as_secs() - ); - - println!("[{}] [Math Genius] *doing math*…", elapsed_time()); - sleep(Duration::from_secs(2)).await; - - let result = number.sqrt(); - println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result); - Ok(json!({"result": result}).to_string()) - }) - }); + math_genius_room.local_participant().register_rpc_method( + "square-root".to_string(), + |data| { + Box::pin(async move { + let json_data: Value = serde_json::from_str(&data.payload).unwrap(); + let number = json_data["number"].as_f64().unwrap(); + println!( + "[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.", + elapsed_time(), + data.caller_identity, + number, + data.response_timeout.as_secs() + ); + + println!("[{}] [Math Genius] *doing math*…", elapsed_time()); + sleep(Duration::from_secs(2)).await; + + let result = number.sqrt(); + println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result); + Ok(json!({"result": result}).to_string()) + }) + }, + ); math_genius_room.local_participant().register_rpc_method( "divide".to_string(), - |_, caller_identity, payload, _| { + |data| { Box::pin(async move { - let json_data: Value = serde_json::from_str(&payload).unwrap(); + let json_data: Value = serde_json::from_str(&data.payload).unwrap(); let dividend = json_data["dividend"].as_i64().unwrap(); let divisor = json_data["divisor"].as_i64().unwrap(); println!( "[{}] [Math Genius] {} wants me to divide {} by {}.", elapsed_time(), - caller_identity, + data.caller_identity, dividend, divisor ); diff --git a/livekit-ffi/src/server/participant.rs b/livekit-ffi/src/server/participant.rs index fdf54bd6..68f1319e 100644 --- a/livekit-ffi/src/server/participant.rs +++ b/livekit-ffi/src/server/participant.rs @@ -93,7 +93,7 @@ impl FfiParticipant { let room: Arc = self.room.clone(); local.register_rpc_method( method.clone(), - move |request_id, caller_identity, payload, response_timeout| { + move |data| { Box::pin({ let room = room.clone(); let method = method.clone(); @@ -103,10 +103,7 @@ impl FfiParticipant { room, local_participant_handle, method, - request_id, - caller_identity, - payload, - response_timeout, + data, ) .await } @@ -118,7 +115,6 @@ impl FfiParticipant { pub fn unregister_rpc_method( &self, - server: &'static FfiServer, request: proto::UnregisterRpcMethodRequest, ) -> FfiResult { let local = match &self.participant { @@ -139,10 +135,7 @@ async fn forward_rpc_method_invocation( room: Arc, local_participant_handle: FfiHandleId, method: String, - request_id: String, - caller_identity: ParticipantIdentity, - payload: String, - response_timeout: Duration, + data: RpcInvocationData, ) -> Result { let (tx, rx) = oneshot::channel(); let invocation_id = server.next_id(); @@ -152,10 +145,10 @@ async fn forward_rpc_method_invocation( local_participant_handle: local_participant_handle as u64, invocation_id, method, - request_id, - caller_identity: caller_identity.into(), - payload, - response_timeout_ms: response_timeout.as_millis() as u32, + request_id: data.request_id, + caller_identity: data.caller_identity.into(), + payload: data.payload, + response_timeout_ms: data.response_timeout.as_millis() as u32, }, )); diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 6af1fd66..5c4aa32f 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -815,7 +815,7 @@ fn on_unregister_rpc_method( ) -> FfiResult { let ffi_participant = server.retrieve_handle::(request.local_participant_handle)?.clone(); - return ffi_participant.unregister_rpc_method(server, request); + return ffi_participant.unregister_rpc_method(request); } fn on_rpc_method_invocation_response( diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index a7b86d62..52ec2c0f 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -15,7 +15,8 @@ pub use crate::{ id::*, participant::{ - ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError, RpcErrorCode, + ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError, + RpcErrorCode, RpcInvocationData, }, publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication}, track::{ diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 4159f84e..f619881c 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -702,7 +702,7 @@ impl RoomSession { request_id, method, payload, - response_timeout_ms, + Duration::from_millis(response_timeout_ms as u64), version, ) .await; diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index a4a954d6..4d4a535b 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -19,7 +19,7 @@ use crate::{ e2ee::EncryptionType, options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions}, prelude::*, - room::participant::rpc::{RpcError, RpcErrorCode, MAX_PAYLOAD_BYTES}, + room::participant::rpc::{RpcError, RpcErrorCode, RpcInvocationData, MAX_PAYLOAD_BYTES}, rtc_engine::{EngineError, RtcEngine}, ChatMessage, DataPacket, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription, }; @@ -36,12 +36,7 @@ use semver::Version; use tokio::sync::oneshot; type RpcHandler = Arc< - dyn Fn( - String, // request_id - ParticipantIdentity, // caller_identity - String, // payload - Duration, // response_timeout_ms - ) -> Pin> + Send>> + dyn Fn(RpcInvocationData) -> Pin> + Send>> + Send + Sync, >; @@ -736,12 +731,7 @@ impl LocalParticipant { pub fn register_rpc_method( &self, method: String, - handler: impl Fn( - String, - ParticipantIdentity, - String, - Duration, - ) -> Pin> + Send>> + handler: impl Fn(RpcInvocationData) -> Pin> + Send>> + Send + Sync + 'static, @@ -785,7 +775,7 @@ impl LocalParticipant { request_id: String, method: String, payload: String, - response_timeout_ms: u32, + response_timeout: Duration, version: u32, ) { if let Err(e) = self @@ -809,12 +799,12 @@ impl LocalParticipant { match handler { Some(handler) => { match tokio::task::spawn(async move { - handler( - request_id.clone(), - caller_identity.clone(), - payload.clone(), - Duration::from_millis(response_timeout_ms as u64), - ) + handler(RpcInvocationData { + request_id: request_id.clone(), + caller_identity: caller_identity.clone(), + payload: payload.clone(), + response_timeout, + }) .await }) .await diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index 54ce7488..d4792e03 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -3,6 +3,22 @@ // SPDX-License-Identifier: Apache-2.0 use livekit_protocol::RpcError as RpcError_Proto; +use crate::room::participant::ParticipantIdentity; +use std::time::Duration; +/// Data passed to method handler for incoming RPC invocations +/// +/// Attributes: +/// request_id (String): The unique request ID. Will match at both sides of the call, useful for debugging or logging. +/// caller_identity (ParticipantIdentity): The unique participant identity of the caller. +/// payload (String): The payload of the request. User-definable format, typically JSON. +/// response_timeout (Duration): The maximum time the caller will wait for a response. +#[derive(Debug, Clone)] +pub struct RpcInvocationData { + pub request_id: String, + pub caller_identity: ParticipantIdentity, + pub payload: String, + pub response_timeout: Duration, +} /// Specialized error handling for RPC methods. /// From 422b0458a174bc480e2faa1da7c4017a10441155 Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Mon, 28 Oct 2024 13:24:12 -0700 Subject: [PATCH 3/6] Duration --- livekit-ffi/src/server/participant.rs | 2 +- livekit/src/room/mod.rs | 6 +++--- livekit/src/room/participant/local_participant.rs | 8 ++++---- livekit/src/rtc_engine/mod.rs | 6 +++--- livekit/src/rtc_engine/rtc_session.rs | 6 ++++-- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/livekit-ffi/src/server/participant.rs b/livekit-ffi/src/server/participant.rs index 68f1319e..9a2dbeec 100644 --- a/livekit-ffi/src/server/participant.rs +++ b/livekit-ffi/src/server/participant.rs @@ -55,7 +55,7 @@ impl FfiParticipant { request.destination_identity.to_string(), request.method, request.payload, - request.response_timeout_ms, + request.response_timeout_ms.map(|ms| Duration::from_millis(ms as u64)), ) .await; diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index f619881c..7e38cbff 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -261,7 +261,7 @@ pub struct RpcRequest { pub id: String, pub method: String, pub payload: String, - pub response_timeout_ms: u32, + pub response_timeout: Duration, pub version: u32, } @@ -689,7 +689,7 @@ impl RoomSession { request_id, method, payload, - response_timeout_ms, + response_timeout, version, } => { if caller_identity.is_none() { @@ -702,7 +702,7 @@ impl RoomSession { request_id, method, payload, - Duration::from_millis(response_timeout_ms as u64), + response_timeout, version, ) .await; diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 4d4a535b..271e531d 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -512,7 +512,7 @@ impl LocalParticipant { id: rpc_request.id, method: rpc_request.method, payload: rpc_request.payload, - response_timeout_ms: rpc_request.response_timeout_ms, + response_timeout_ms: rpc_request.response_timeout.as_millis() as u32, version: rpc_request.version, ..Default::default() }; @@ -643,9 +643,9 @@ impl LocalParticipant { destination_identity: String, method: String, payload: String, - response_timeout_ms: Option, + response_timeout: Option, ) -> Result { - let response_timeout = Duration::from_millis(response_timeout_ms.unwrap_or(10000) as u64); + let response_timeout = response_timeout.unwrap_or(Duration::from_millis(10000)); let max_round_trip_latency = Duration::from_millis(2000); if payload.len() > MAX_PAYLOAD_BYTES { @@ -674,7 +674,7 @@ impl LocalParticipant { id: id.clone(), method: method.clone(), payload: payload.clone(), - response_timeout_ms: (response_timeout - max_round_trip_latency).as_millis() as u32, + response_timeout: response_timeout - max_round_trip_latency, version: 1, }) .await diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index d848571b..4b3c4493 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -118,7 +118,7 @@ pub enum EngineEvent { request_id: String, method: String, payload: String, - response_timeout_ms: u32, + response_timeout: Duration, version: u32, }, RpcResponse { @@ -487,7 +487,7 @@ impl EngineInner { request_id, method, payload, - response_timeout_ms, + response_timeout, version, } => { let _ = self.engine_tx.send(EngineEvent::RpcRequest { @@ -495,7 +495,7 @@ impl EngineInner { request_id, method, payload, - response_timeout_ms, + response_timeout, version, }); } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 5377be51..fa01a69f 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -101,7 +101,7 @@ pub enum SessionEvent { request_id: String, method: String, payload: String, - response_timeout_ms: u32, + response_timeout: Duration, version: u32, }, RpcResponse { @@ -689,7 +689,9 @@ impl SessionInner { request_id: rpc_request.id.clone(), method: rpc_request.method.clone(), payload: rpc_request.payload.clone(), - response_timeout_ms: rpc_request.response_timeout_ms, + response_timeout: Duration::from_millis( + rpc_request.response_timeout_ms as u64, + ), version: rpc_request.version, }); } From aa60a83b5167bacecc696f6c5bda63550f62e50c Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Mon, 28 Oct 2024 13:25:56 -0700 Subject: [PATCH 4/6] fmt --- livekit-ffi/src/server/participant.rs | 35 ++++++++++++--------------- livekit/src/room/participant/rpc.rs | 2 +- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/livekit-ffi/src/server/participant.rs b/livekit-ffi/src/server/participant.rs index 9a2dbeec..3cbc097d 100644 --- a/livekit-ffi/src/server/participant.rs +++ b/livekit-ffi/src/server/participant.rs @@ -91,25 +91,22 @@ impl FfiParticipant { let local_participant_handle = self.handle.clone(); let room: Arc = self.room.clone(); - local.register_rpc_method( - method.clone(), - move |data| { - Box::pin({ - let room = room.clone(); - let method = method.clone(); - async move { - forward_rpc_method_invocation( - server, - room, - local_participant_handle, - method, - data, - ) - .await - } - }) - }, - ); + local.register_rpc_method(method.clone(), move |data| { + Box::pin({ + let room = room.clone(); + let method = method.clone(); + async move { + forward_rpc_method_invocation( + server, + room, + local_participant_handle, + method, + data, + ) + .await + } + }) + }); Ok(proto::RegisterRpcMethodResponse {}) } diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index d4792e03..3baf53eb 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 -use livekit_protocol::RpcError as RpcError_Proto; use crate::room::participant::ParticipantIdentity; +use livekit_protocol::RpcError as RpcError_Proto; use std::time::Duration; /// Data passed to method handler for incoming RPC invocations /// From 9b73d6c9b3b719b913f63464b87380759473efe3 Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Wed, 30 Oct 2024 10:57:13 -0700 Subject: [PATCH 5/6] params --- examples/rpc/src/main.rs | 109 +++++++++--------- livekit-ffi/src/server/participant.rs | 15 ++- livekit/src/prelude.rs | 4 +- .../src/room/participant/local_participant.rs | 22 ++-- livekit/src/room/participant/rpc.rs | 21 ++++ 5 files changed, 93 insertions(+), 78 deletions(-) diff --git a/examples/rpc/src/main.rs b/examples/rpc/src/main.rs index c778a454..d4a1ec71 100644 --- a/examples/rpc/src/main.rs +++ b/examples/rpc/src/main.rs @@ -78,21 +78,18 @@ async fn main() -> Result<(), Box> { } async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: &Arc) { - greeters_room.local_participant().register_rpc_method( - "arrival".to_string(), - |data| { - Box::pin(async move { - println!( - "[{}] [Greeter] Oh {} arrived and said \"{}\"", - elapsed_time(), - data.caller_identity, - data.payload - ); - sleep(Duration::from_secs(2)).await; - Ok("Welcome and have a wonderful day!".to_string()) - }) - }, - ); + greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| { + Box::pin(async move { + println!( + "[{}] [Greeter] Oh {} arrived and said \"{}\"", + elapsed_time(), + data.caller_identity, + data.payload + ); + sleep(Duration::from_secs(2)).await; + Ok("Welcome and have a wonderful day!".to_string()) + }) + }); math_genius_room.local_participant().register_rpc_method( "square-root".to_string(), @@ -118,34 +115,36 @@ async fn register_receiver_methods(greeters_room: &Arc, math_genius_room: }, ); - math_genius_room.local_participant().register_rpc_method( - "divide".to_string(), - |data| { - Box::pin(async move { - let json_data: Value = serde_json::from_str(&data.payload).unwrap(); - let dividend = json_data["dividend"].as_i64().unwrap(); - let divisor = json_data["divisor"].as_i64().unwrap(); - println!( - "[{}] [Math Genius] {} wants me to divide {} by {}.", - elapsed_time(), - data.caller_identity, - dividend, - divisor - ); - - let result = dividend / divisor; - println!("[{}] [Math Genius] The result is {}", elapsed_time(), result); - Ok(json!({"result": result}).to_string()) - }) - }, - ); + math_genius_room.local_participant().register_rpc_method("divide".to_string(), |data| { + Box::pin(async move { + let json_data: Value = serde_json::from_str(&data.payload).unwrap(); + let dividend = json_data["dividend"].as_i64().unwrap(); + let divisor = json_data["divisor"].as_i64().unwrap(); + println!( + "[{}] [Math Genius] {} wants me to divide {} by {}.", + elapsed_time(), + data.caller_identity, + dividend, + divisor + ); + + let result = dividend / divisor; + println!("[{}] [Math Genius] The result is {}", elapsed_time(), result); + Ok(json!({"result": result}).to_string()) + }) + }); } async fn perform_greeting(room: &Arc) -> Result<(), Box> { println!("[{}] Letting the greeter know that I've arrived", elapsed_time()); match room .local_participant() - .perform_rpc("greeter".to_string(), "arrival".to_string(), "Hello".to_string(), None) + .perform_rpc(PerformRpcParams { + destination_identity: "greeter".to_string(), + method: "arrival".to_string(), + payload: "Hello".to_string(), + ..Default::default() + }) .await { Ok(response) => { @@ -160,12 +159,12 @@ async fn perform_square_root(room: &Arc) -> Result<(), Box { @@ -183,12 +182,12 @@ async fn perform_quantum_hypergeometric_series( println!("[{}] What's the quantum hypergeometric series of 42?", elapsed_time()); match room .local_participant() - .perform_rpc( - "math-genius".to_string(), - "quantum-hypergeometric-series".to_string(), - json!({"number": 42}).to_string(), - None, - ) + .perform_rpc(PerformRpcParams { + destination_identity: "math-genius".to_string(), + method: "quantum-hypergeometric-series".to_string(), + payload: json!({"number": 42}).to_string(), + ..Default::default() + }) .await { Ok(response) => { @@ -210,12 +209,12 @@ async fn perform_division(room: &Arc) -> Result<(), Box { diff --git a/livekit-ffi/src/server/participant.rs b/livekit-ffi/src/server/participant.rs index 3cbc097d..4ccda3f0 100644 --- a/livekit-ffi/src/server/participant.rs +++ b/livekit-ffi/src/server/participant.rs @@ -51,12 +51,15 @@ impl FfiParticipant { let handle = server.async_runtime.spawn(async move { let result = local - .perform_rpc( - request.destination_identity.to_string(), - request.method, - request.payload, - request.response_timeout_ms.map(|ms| Duration::from_millis(ms as u64)), - ) + .perform_rpc(PerformRpcParams { + destination_identity: request.destination_identity.to_string(), + method: request.method, + payload: request.payload, + response_timeout: request + .response_timeout_ms + .map(|ms| Duration::from_millis(ms as u64)) + .unwrap_or(PerformRpcParams::default().response_timeout), + }) .await; let callback = proto::PerformRpcCallback { diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 52ec2c0f..1bc06916 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -15,8 +15,8 @@ pub use crate::{ id::*, participant::{ - ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError, - RpcErrorCode, RpcInvocationData, + ConnectionQuality, LocalParticipant, Participant, PerformRpcParams, RemoteParticipant, + RpcError, RpcErrorCode, RpcInvocationData, }, publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication}, track::{ diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 271e531d..128b9947 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -67,7 +67,6 @@ impl RpcState { } } } - struct LocalInfo { events: LocalEvents, encryption_type: EncryptionType, @@ -638,17 +637,10 @@ impl LocalParticipant { self.inner.info.read().kind } - pub async fn perform_rpc( - &self, - destination_identity: String, - method: String, - payload: String, - response_timeout: Option, - ) -> Result { - let response_timeout = response_timeout.unwrap_or(Duration::from_millis(10000)); + pub async fn perform_rpc(&self, params: PerformRpcParams) -> Result { let max_round_trip_latency = Duration::from_millis(2000); - if payload.len() > MAX_PAYLOAD_BYTES { + if params.payload.len() > MAX_PAYLOAD_BYTES { return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None)); } @@ -670,11 +662,11 @@ impl LocalParticipant { match self .publish_rpc_request(RpcRequest { - destination_identity: destination_identity.clone(), + destination_identity: params.destination_identity.clone(), id: id.clone(), - method: method.clone(), - payload: payload.clone(), - response_timeout: response_timeout - max_round_trip_latency, + method: params.method.clone(), + payload: params.payload.clone(), + response_timeout: params.response_timeout - max_round_trip_latency, version: 1, }) .await @@ -704,7 +696,7 @@ impl LocalParticipant { } // Wait for response timout - let response = match tokio::time::timeout(response_timeout, response_rx).await { + let response = match tokio::time::timeout(params.response_timeout, response_rx).await { Err(_) => { self.local.rpc_state.lock().pending_responses.remove(&id); return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None)); diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index 3baf53eb..c3fe7cde 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -5,6 +5,27 @@ use crate::room::participant::ParticipantIdentity; use livekit_protocol::RpcError as RpcError_Proto; use std::time::Duration; + +/// Parameters for performing an RPC call +#[derive(Debug, Clone)] +pub struct PerformRpcParams { + pub destination_identity: String, + pub method: String, + pub payload: String, + pub response_timeout: Duration, +} + +impl Default for PerformRpcParams { + fn default() -> Self { + Self { + destination_identity: Default::default(), + method: Default::default(), + payload: Default::default(), + response_timeout: Duration::from_secs(10), + } + } +} + /// Data passed to method handler for incoming RPC invocations /// /// Attributes: From 1fae25830f8625dcc0738ba3514ae33746ab1e37 Mon Sep 17 00:00:00 2001 From: Ben Cherry Date: Thu, 31 Oct 2024 14:00:54 -0700 Subject: [PATCH 6/6] data --- examples/rpc/src/main.rs | 8 ++++---- livekit-ffi/src/server/participant.rs | 4 ++-- livekit/src/prelude.rs | 2 +- livekit/src/room/participant/local_participant.rs | 14 +++++++------- livekit/src/room/participant/rpc.rs | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/examples/rpc/src/main.rs b/examples/rpc/src/main.rs index d4a1ec71..11e0694b 100644 --- a/examples/rpc/src/main.rs +++ b/examples/rpc/src/main.rs @@ -139,7 +139,7 @@ async fn perform_greeting(room: &Arc) -> Result<(), Box) -> Result<(), Box) -> Result<(), Box Result { + pub async fn perform_rpc(&self, data: PerformRpcData) -> Result { let max_round_trip_latency = Duration::from_millis(2000); - if params.payload.len() > MAX_PAYLOAD_BYTES { + if data.payload.len() > MAX_PAYLOAD_BYTES { return Err(RpcError::built_in(RpcErrorCode::RequestPayloadTooLarge, None)); } @@ -662,11 +662,11 @@ impl LocalParticipant { match self .publish_rpc_request(RpcRequest { - destination_identity: params.destination_identity.clone(), + destination_identity: data.destination_identity.clone(), id: id.clone(), - method: params.method.clone(), - payload: params.payload.clone(), - response_timeout: params.response_timeout - max_round_trip_latency, + method: data.method.clone(), + payload: data.payload.clone(), + response_timeout: data.response_timeout - max_round_trip_latency, version: 1, }) .await @@ -696,7 +696,7 @@ impl LocalParticipant { } // Wait for response timout - let response = match tokio::time::timeout(params.response_timeout, response_rx).await { + let response = match tokio::time::timeout(data.response_timeout, response_rx).await { Err(_) => { self.local.rpc_state.lock().pending_responses.remove(&id); return Err(RpcError::built_in(RpcErrorCode::ResponseTimeout, None)); diff --git a/livekit/src/room/participant/rpc.rs b/livekit/src/room/participant/rpc.rs index c3fe7cde..34e043ef 100644 --- a/livekit/src/room/participant/rpc.rs +++ b/livekit/src/room/participant/rpc.rs @@ -8,14 +8,14 @@ use std::time::Duration; /// Parameters for performing an RPC call #[derive(Debug, Clone)] -pub struct PerformRpcParams { +pub struct PerformRpcData { pub destination_identity: String, pub method: String, pub payload: String, pub response_timeout: Duration, } -impl Default for PerformRpcParams { +impl Default for PerformRpcData { fn default() -> Self { Self { destination_identity: Default::default(),