Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: kayleg/cloud-pubsub
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: blinksh/cloud-pubsub
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 3 commits
  • 2 files changed
  • 1 contributor

Commits on Mar 17, 2023

  1. Ping topic idea

    Carlos Cabanero committed Mar 17, 2023
    Copy the full SHA
    c41ae41 View commit details
  2. ACK Ping messages

    Carlos Cabanero committed Mar 17, 2023
    Copy the full SHA
    c2c44e7 View commit details
  3. Remove Echo messages as those are overriding processing

    Carlos Cabanero committed Mar 17, 2023
    Copy the full SHA
    6e9aef8 View commit details
Showing with 25 additions and 8 deletions.
  1. +6 −0 src/message.rs
  2. +19 −8 src/subscription.rs
6 changes: 6 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@ use base64::{self, Engine};
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;

pub(crate) const PING: &str = "ping";

#[derive(Deserialize, Clone, Serialize)]
pub struct EncodedMessage {
data: String,
@@ -38,6 +40,10 @@ impl EncodedMessage {
let data = base64::engine::general_purpose::STANDARD.encode(&incoming);
EncodedMessage { data, attributes }
}

pub fn ping() -> Self {
Self::new_binary(&PING.to_string(), None)
}
}

#[derive(Deserialize)]
27 changes: 19 additions & 8 deletions src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::client::Client;
use crate::error;
use crate::message::{FromPubSubMessage, Message};
use crate::message::{FromPubSubMessage, Message, PING};
use hyper::body::Buf;
use hyper::{Method, StatusCode};
use lazy_static::lazy_static;
@@ -84,18 +84,29 @@ impl Subscription {
message: self.name.clone(),
});
}

let body = hyper::body::aggregate(response).await?;
let response: Response = serde_json::from_reader(body.reader())?;
if let Some(e) = response.error {
return Err(e);
}
let messages = response
.received_messages
.unwrap_or_default()
.into_iter()
.map(|m| (T::from(m.message), m.ack_id))
.collect();
Ok(messages)

let messages = response.received_messages.unwrap_or_default();

let mut batch = Vec::new();

for m in messages {
if let Ok(data) = m.message.decode() {
if data == PING.as_bytes() {
log::debug!("{}", PING);
self.acknowledge_messages(vec![m.ack_id]).await;
continue;
}
}
batch.push((T::from(m.message), m.ack_id));
}

Ok(batch)
}

pub async fn destroy(self) -> Result<(), error::Error> {