Skip to content

Commit

Permalink
Merge pull request #19 from svix/onelson/memory-queue-refactor
Browse files Browse the repository at this point in the history
Delayed message support for memory, rabbitmq, redis
  • Loading branch information
svix-onelson authored Nov 6, 2023
2 parents 6ed3b4b + c9323b3 commit 960aab8
Show file tree
Hide file tree
Showing 14 changed files with 528 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[workspace]
resolver = "2"

members = [
"omniqueue",
]
1 change: 1 addition & 0 deletions _rabbit/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management, rabbitmq_delayed_message_exchange].
Binary file not shown.
3 changes: 2 additions & 1 deletion omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lapin = { version = "2", optional = true }
redis = { version = "0.23", features = ["tokio-comp", "tokio-native-tls-comp", "streams"], optional = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
svix-ksuid = { version = "0.7.0", optional = true }
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", optional = true }
Expand All @@ -43,6 +44,6 @@ gcp_pubsub = [
]
memory_queue = []
rabbitmq = ["dep:lapin"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
redis_cluster = ["redis", "redis/cluster-async"]
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
66 changes: 56 additions & 10 deletions omniqueue/src/backends/memory_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
scheduled::ScheduledProducer,
QueueError,
};

pub struct MemoryQueueBackend;

#[async_trait]
impl QueueBackend for MemoryQueueBackend {
type Config = usize;

type PayloadIn = Vec<u8>;
type PayloadOut = Vec<u8>;

type PayloadOut = Vec<u8>;
type Producer = MemoryQueueProducer;

type Consumer = MemoryQueueConsumer;
type Config = usize;

async fn new_pair(
config: usize,
Expand Down Expand Up @@ -72,7 +73,7 @@ impl QueueProducer for MemoryQueueProducer {
self.registry.as_ref()
}

async fn send_raw(&self, payload: &Vec<u8>) -> Result<(), QueueError> {
async fn send_raw(&self, payload: &Self::Payload) -> Result<(), QueueError> {
self.tx
.send(payload.clone())
.map(|_| ())
Expand All @@ -85,6 +86,26 @@ impl QueueProducer for MemoryQueueProducer {
}
}

#[async_trait]
impl ScheduledProducer for MemoryQueueProducer {
async fn send_raw_scheduled(
&self,
payload: &Self::Payload,
delay: Duration,
) -> Result<(), QueueError> {
let tx = self.tx.clone();
let payload = payload.clone();
tokio::spawn(async move {
tracing::trace!("MemoryQueue: event sent > (delay: {:?})", delay);
tokio::time::sleep(delay).await;
if tx.send(payload).is_err() {
tracing::error!("Receiver dropped");
}
});
Ok(())
}
}

pub struct MemoryQueueConsumer {
registry: DecoderRegistry<Vec<u8>>,
rx: broadcast::Receiver<Vec<u8>>,
Expand All @@ -99,7 +120,7 @@ impl MemoryQueueConsumer {
acker: Box::new(MemoryQueueAcker {
tx: self.tx.clone(),
payload_copy: Some(payload),
alredy_acked_or_nacked: false,
already_acked_or_nacked: false,
}),
}
}
Expand Down Expand Up @@ -144,25 +165,25 @@ impl QueueConsumer for MemoryQueueConsumer {
pub struct MemoryQueueAcker {
tx: broadcast::Sender<Vec<u8>>,
payload_copy: Option<Vec<u8>>,
alredy_acked_or_nacked: bool,
already_acked_or_nacked: bool,
}

#[async_trait]
impl Acker for MemoryQueueAcker {
async fn ack(&mut self) -> Result<(), QueueError> {
if self.alredy_acked_or_nacked {
if self.already_acked_or_nacked {
Err(QueueError::CannotAckOrNackTwice)
} else {
self.alredy_acked_or_nacked = true;
self.already_acked_or_nacked = true;
Ok(())
}
}

async fn nack(&mut self) -> Result<(), QueueError> {
if self.alredy_acked_or_nacked {
if self.already_acked_or_nacked {
Err(QueueError::CannotAckOrNackTwice)
} else {
self.alredy_acked_or_nacked = true;
self.already_acked_or_nacked = true;
self.tx
.send(
self.payload_copy
Expand All @@ -182,6 +203,7 @@ mod tests {

use crate::{
queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBuilder},
scheduled::ScheduledProducer,
QueueError,
};

Expand Down Expand Up @@ -395,4 +417,28 @@ mod tests {
assert!(elapsed >= deadline);
assert!(elapsed <= deadline + Duration::from_millis(200));
}

#[tokio::test]
async fn test_scheduled() {
let payload1 = ExType { a: 1 };

let (p, mut c) = QueueBuilder::<MemoryQueueBackend, _>::new(16)
.build_pair()
.await
.unwrap();

let delay = Duration::from_millis(100);
let now = Instant::now();
p.send_serde_json_scheduled(&payload1, delay).await.unwrap();
let delivery = c
.receive_all(1, delay * 2)
.await
.unwrap()
.into_iter()
.next()
.unwrap();
assert!(now.elapsed() >= delay);
assert!(now.elapsed() < delay * 2);
assert_eq!(Some(payload1), delivery.payload_serde_json().unwrap());
}
}
38 changes: 35 additions & 3 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{any::TypeId, collections::HashMap};
use async_trait::async_trait;
use futures::StreamExt;
use futures_util::FutureExt;
use lapin::types::AMQPValue;
pub use lapin::{
acker::Acker as LapinAcker,
options::{
Expand All @@ -18,6 +19,7 @@ use crate::{
decoding::DecoderRegistry,
encoding::{CustomEncoder, EncoderRegistry},
queue::{consumer::QueueConsumer, producer::QueueProducer, Acker, Delivery, QueueBackend},
scheduled::ScheduledProducer,
QueueError,
};

Expand Down Expand Up @@ -89,13 +91,13 @@ async fn producer(

#[async_trait]
impl QueueBackend for RabbitMqBackend {
type Config = RabbitMqConfig;

type PayloadIn = Vec<u8>;

type PayloadOut = Vec<u8>;
type Producer = RabbitMqProducer;

type Consumer = RabbitMqConsumer;
type Producer = RabbitMqProducer;
type Config = RabbitMqConfig;

async fn new_pair(
cfg: RabbitMqConfig,
Expand Down Expand Up @@ -168,6 +170,36 @@ impl QueueProducer for RabbitMqProducer {
}
}

#[async_trait]
impl ScheduledProducer for RabbitMqProducer {
async fn send_raw_scheduled(
&self,
payload: &Self::Payload,
delay: Duration,
) -> Result<(), QueueError> {
let mut headers = FieldTable::default();

let delay_ms: u32 = delay
.as_millis()
.try_into()
.map_err(|_| QueueError::Generic("delay is too large".into()))?;
headers.insert("x-delay".into(), AMQPValue::LongUInt(delay_ms));

self.channel
.basic_publish(
&self.exchange,
&self.routing_key,
self.options,
payload,
self.properties.clone().with_headers(headers),
)
.await
.map_err(QueueError::generic)?;

Ok(())
}
}

pub struct RabbitMqConsumer {
registry: DecoderRegistry<Vec<u8>>,
consumer: Consumer,
Expand Down
Loading

0 comments on commit 960aab8

Please sign in to comment.