diff --git a/src/api.rs b/src/api.rs index 8681fcb..986bd9c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -403,6 +403,27 @@ where Ok(response) } + /// Lists all stream consumers across the cluster. + pub async fn list_stream_consumers(&self) -> Result> { + let response = self + .http_get(path!("stream", "consumers"), None, None) + .await?; + let response = response.json().await?; + Ok(response) + } + + /// Lists stream consumers on connections in the given virtual host. + pub async fn list_stream_consumers_in( + &self, + virtual_host: &str, + ) -> Result> { + let response = self + .http_get(path!("stream", "consumers", virtual_host), None, None) + .await?; + let response = response.json().await?; + Ok(response) + } + /// Lists all queues and streams across the cluster. pub async fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None).await?; diff --git a/src/blocking_api.rs b/src/blocking_api.rs index ec6a09a..d3aaf9b 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -377,6 +377,25 @@ where Ok(response) } + /// Lists all stream consumers across the cluster. + pub fn list_stream_consumers(&self) -> Result> { + let response = self.http_get(path!("stream", "consumers"), None, None)?; + + let response = response.json()?; + Ok(response) + } + + /// Lists stream consumers on connections in the given virtual host. + pub fn list_stream_consumers_in( + &self, + virtual_host: &str, + ) -> Result> { + let response = self.http_get(path!("stream", "consumers", virtual_host), None, None)?; + + let response = response.json()?; + Ok(response) + } + /// Lists all queues and streams across the cluster. pub fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None)?; diff --git a/src/responses.rs b/src/responses.rs index bcc53e4..eaf000b 100644 --- a/src/responses.rs +++ b/src/responses.rs @@ -94,6 +94,21 @@ pub struct StreamPublisher { pub errored: u64, } +#[derive(Debug, Deserialize, Clone)] +#[cfg_attr(feature = "tabled", derive(Tabled))] +#[allow(dead_code)] +pub struct StreamConsumer { + #[cfg_attr(feature = "tabled", tabled(skip))] + pub connection_details: ConnectionDetails, + pub queue: NameAndVirtualHost, + pub subscription_id: u32, + pub credits: u64, + pub consumed: u64, + pub offset_lag: u64, + pub offset: u64, + pub properties: XArguments, +} + #[derive(Debug, Serialize, Deserialize, Clone, Default)] #[serde(transparent)] pub struct RuntimeParameterValue(pub Map); diff --git a/tests/stream_consumer_tests.rs b/tests/stream_consumer_tests.rs new file mode 100644 index 0000000..df569f9 --- /dev/null +++ b/tests/stream_consumer_tests.rs @@ -0,0 +1,44 @@ +// Copyright (C) 2023-2024 RabbitMQ Core Team (teamrabbitmq@gmail.com) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use rabbitmq_http_client::blocking_api::Client; + +mod test_helpers; +use crate::test_helpers::{endpoint, PASSWORD, USERNAME}; + +#[test] +fn test_list_stream_consumers() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + let result1 = rc.list_stream_consumers(); + assert!( + result1.is_ok(), + "list_stream_publishers returned {:?}", + result1 + ); +} + +#[test] +fn test_list_virtual_host_stream_consumers() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + let vh_name = "/"; + let result1 = rc.list_stream_consumers_in(vh_name); + assert!( + result1.is_ok(), + "list_stream_publishers_in returned {:?}", + result1 + ); +} diff --git a/tests/user_limit_tests.rs b/tests/user_limit_tests.rs index 0ee632a..3e2276b 100644 --- a/tests/user_limit_tests.rs +++ b/tests/user_limit_tests.rs @@ -81,7 +81,6 @@ fn test_list_user_limits() { assert!(result2.is_ok()); let result3 = rc.list_user_limits(params.name); - dbg!(&result3); assert!(result3.is_ok()); let vec = result3.unwrap();