Skip to content

Commit

Permalink
Implement listing of/for stream consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Dec 31, 2024
1 parent 6e983cd commit 122f4aa
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 1 deletion.
21 changes: 21 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,27 @@ where
Ok(response)
}

/// Lists all stream consumers across the cluster.
pub async fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
let response = self
.http_get(path!("stream", "consumers"), None, None)
.await?;
let response = response.json().await?;
Ok(response)
}

/// Lists stream consumers on connections in the given virtual host.
pub async fn list_stream_consumers_in(
&self,
virtual_host: &str,
) -> Result<Vec<responses::StreamConsumer>> {
let response = self
.http_get(path!("stream", "consumers", virtual_host), None, None)
.await?;
let response = response.json().await?;
Ok(response)
}

/// Lists all queues and streams across the cluster.
pub async fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
let response = self.http_get("queues", None, None).await?;
Expand Down
19 changes: 19 additions & 0 deletions src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,25 @@ where
Ok(response)
}

/// Lists all stream consumers across the cluster.
pub fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
let response = self.http_get(path!("stream", "consumers"), None, None)?;

let response = response.json()?;
Ok(response)
}

/// Lists stream consumers on connections in the given virtual host.
pub fn list_stream_consumers_in(
&self,
virtual_host: &str,
) -> Result<Vec<responses::StreamConsumer>> {
let response = self.http_get(path!("stream", "consumers", virtual_host), None, None)?;

let response = response.json()?;
Ok(response)
}

/// Lists all queues and streams across the cluster.
pub fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
let response = self.http_get("queues", None, None)?;
Expand Down
15 changes: 15 additions & 0 deletions src/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ pub struct StreamPublisher {
pub errored: u64,
}

#[derive(Debug, Deserialize, Clone)]
#[cfg_attr(feature = "tabled", derive(Tabled))]
#[allow(dead_code)]
pub struct StreamConsumer {
#[cfg_attr(feature = "tabled", tabled(skip))]
pub connection_details: ConnectionDetails,
pub queue: NameAndVirtualHost,
pub subscription_id: u32,
pub credits: u64,
pub consumed: u64,
pub offset_lag: u64,
pub offset: u64,
pub properties: XArguments,
}

#[derive(Debug, Serialize, Deserialize, Clone, Default)]
#[serde(transparent)]
pub struct RuntimeParameterValue(pub Map<String, serde_json::Value>);
Expand Down
44 changes: 44 additions & 0 deletions tests/stream_consumer_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (C) 2023-2024 RabbitMQ Core Team ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rabbitmq_http_client::blocking_api::Client;

mod test_helpers;
use crate::test_helpers::{endpoint, PASSWORD, USERNAME};

#[test]
fn test_list_stream_consumers() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let result1 = rc.list_stream_consumers();
assert!(
result1.is_ok(),
"list_stream_publishers returned {:?}",
result1
);
}

#[test]
fn test_list_virtual_host_stream_consumers() {
let endpoint = endpoint();
let rc = Client::new(&endpoint, USERNAME, PASSWORD);

let vh_name = "/";
let result1 = rc.list_stream_consumers_in(vh_name);
assert!(
result1.is_ok(),
"list_stream_publishers_in returned {:?}",
result1
);
}
1 change: 0 additions & 1 deletion tests/user_limit_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ fn test_list_user_limits() {
assert!(result2.is_ok());

let result3 = rc.list_user_limits(params.name);
dbg!(&result3);
assert!(result3.is_ok());
let vec = result3.unwrap();

Expand Down

0 comments on commit 122f4aa

Please sign in to comment.