From e0ec22a4f9d265f7465e43e3f677e04ab790f2a1 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 31 Dec 2024 00:27:26 -0500 Subject: [PATCH] Support more operations on stream publishers and consumers --- src/api.rs | 38 +++++++++++++++++++++++++++++++++++++- src/blocking_api.rs | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/api.rs b/src/api.rs index 986bd9c..fc5a467 100644 --- a/src/api.rs +++ b/src/api.rs @@ -386,7 +386,7 @@ where Ok(response) } - /// Lists stream publishers of the given stream. + /// Lists stream publishers publishing to the given stream. pub async fn list_stream_publishers_of( &self, virtual_host: &str, @@ -403,6 +403,24 @@ where Ok(response) } + /// Lists stream publishers on the given stream connection. + pub async fn list_stream_publishers_on_connection( + &self, + virtual_host: &str, + name: &str, + ) -> Result> { + let response = self + .http_get( + path!("stream", "connections", virtual_host, name, "publishers"), + None, + None, + ) + .await?; + + let response = response.json().await?; + Ok(response) + } + /// Lists all stream consumers across the cluster. pub async fn list_stream_consumers(&self) -> Result> { let response = self @@ -424,6 +442,24 @@ where Ok(response) } + /// Lists stream consumers on the given stream connection. + pub async fn list_stream_consumers_on_connection( + &self, + virtual_host: &str, + name: &str, + ) -> Result> { + let response = self + .http_get( + path!("stream", "connections", virtual_host, name, "consumers"), + None, + None, + ) + .await?; + + let response = response.json().await?; + Ok(response) + } + /// Lists all queues and streams across the cluster. pub async fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None).await?; diff --git a/src/blocking_api.rs b/src/blocking_api.rs index d3aaf9b..ce142ad 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -350,7 +350,7 @@ where Ok(response) } - /// Lists stream publishers on connections in the given virtual host. + /// Lists stream publishers publishing to the given stream. pub fn list_stream_publishers_in( &self, virtual_host: &str, @@ -377,6 +377,22 @@ where Ok(response) } + /// Lists stream publishers on the given stream connection. + pub fn list_stream_publishers_on_connection( + &self, + virtual_host: &str, + name: &str, + ) -> Result> { + let response = self.http_get( + path!("stream", "connections", virtual_host, name, "publishers"), + None, + None, + )?; + + let response = response.json()?; + Ok(response) + } + /// Lists all stream consumers across the cluster. pub fn list_stream_consumers(&self) -> Result> { let response = self.http_get(path!("stream", "consumers"), None, None)?; @@ -396,6 +412,22 @@ where Ok(response) } + /// Lists stream consumers on the given stream connection. + pub fn list_stream_consumers_on_connection( + &self, + virtual_host: &str, + name: &str, + ) -> Result> { + let response = self.http_get( + path!("stream", "connections", virtual_host, name, "consumers"), + None, + None, + )?; + + let response = response.json()?; + Ok(response) + } + /// Lists all queues and streams across the cluster. pub fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None)?;