Skip to content

Commit

Permalink
Support more operations on stream publishers and consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Dec 31, 2024
1 parent 122f4aa commit e0ec22a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 2 deletions.
38 changes: 37 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ where
Ok(response)
}

/// Lists stream publishers of the given stream.
/// Lists stream publishers publishing to the given stream.
pub async fn list_stream_publishers_of(
&self,
virtual_host: &str,
Expand All @@ -403,6 +403,24 @@ where
Ok(response)
}

/// Lists stream publishers on the given stream connection.
pub async fn list_stream_publishers_on_connection(
&self,
virtual_host: &str,
name: &str,
) -> Result<Vec<responses::StreamPublisher>> {
let response = self
.http_get(
path!("stream", "connections", virtual_host, name, "publishers"),
None,
None,
)
.await?;

let response = response.json().await?;
Ok(response)
}

/// Lists all stream consumers across the cluster.
pub async fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
let response = self
Expand All @@ -424,6 +442,24 @@ where
Ok(response)
}

/// Lists stream consumers on the given stream connection.
pub async fn list_stream_consumers_on_connection(
&self,
virtual_host: &str,
name: &str,
) -> Result<Vec<responses::StreamConsumer>> {
let response = self
.http_get(
path!("stream", "connections", virtual_host, name, "consumers"),
None,
None,
)
.await?;

let response = response.json().await?;
Ok(response)
}

/// Lists all queues and streams across the cluster.
pub async fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
let response = self.http_get("queues", None, None).await?;
Expand Down
34 changes: 33 additions & 1 deletion src/blocking_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ where
Ok(response)
}

/// Lists stream publishers on connections in the given virtual host.
/// Lists stream publishers publishing to the given stream.
pub fn list_stream_publishers_in(
&self,
virtual_host: &str,
Expand All @@ -377,6 +377,22 @@ where
Ok(response)
}

/// Lists stream publishers on the given stream connection.
pub fn list_stream_publishers_on_connection(
&self,
virtual_host: &str,
name: &str,
) -> Result<Vec<responses::StreamPublisher>> {
let response = self.http_get(
path!("stream", "connections", virtual_host, name, "publishers"),
None,
None,
)?;

let response = response.json()?;
Ok(response)
}

/// Lists all stream consumers across the cluster.
pub fn list_stream_consumers(&self) -> Result<Vec<responses::StreamConsumer>> {
let response = self.http_get(path!("stream", "consumers"), None, None)?;
Expand All @@ -396,6 +412,22 @@ where
Ok(response)
}

/// Lists stream consumers on the given stream connection.
pub fn list_stream_consumers_on_connection(
&self,
virtual_host: &str,
name: &str,
) -> Result<Vec<responses::StreamConsumer>> {
let response = self.http_get(
path!("stream", "connections", virtual_host, name, "consumers"),
None,
None,
)?;

let response = response.json()?;
Ok(response)
}

/// Lists all queues and streams across the cluster.
pub fn list_queues(&self) -> Result<Vec<responses::QueueInfo>> {
let response = self.http_get("queues", None, None)?;
Expand Down

0 comments on commit e0ec22a

Please sign in to comment.