Skip to content
This repository has been archived by the owner on Apr 16, 2022. It is now read-only.

Commit

Permalink
add suport for headers in the message
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslantalpa committed Mar 24, 2021
1 parent 7db9604 commit 9bcdf21
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 27 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ log = "0.4.14"
postgres = "0.19.0"
r2d2 = "0.8.9"
r2d2_postgres = "0.18.0"
maplit = "1.0.2"

[dev-dependencies]
rustc-test = "0.3.1"
futures = "0.3.13"
lapin-futures = "0.33.0"
tokio-core = "0.1.18"
rustc-test = "0.3.0"
futures = "0.1.12"
lapin-futures = "0.9.0"
tokio-core = "0.1.6"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ You can specify the routing key with the usual syntax used for topic exchanges.
NOTIFY pgchannel3, '*.orange|Topic message';
NOTIFY pgchannel3, 'quick.brown.fox|Topic message';
NOTIFY pgchannel3, 'lazy.#|Topic message';
NOTIFY pgchannel3, 'key|X-First-Header: value1, value2; X-Second-Header: value3|message'
```

## Helper Functions
Expand Down
67 changes: 50 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ extern crate r2d2;
extern crate r2d2_postgres;
extern crate postgres;
#[macro_use] extern crate log;
#[macro_use] extern crate maplit;

use amqp::{Session, Basic, protocol, Channel, Table, AMQPError};

use amqp::{Session, Basic, protocol, Channel, Table, AMQPError, TableEntry};
use fallible_iterator::FallibleIterator;
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::{PostgresConnectionManager, postgres::NoTls};
Expand Down Expand Up @@ -41,6 +43,9 @@ impl ChannelCounter {
}

const SEPARATOR: char = '|';
const HEADERS_SEPARATOR: char = ';';
const HEADER_NAME_VALUE_SEPARATOR: char = ':';
const HEADER_VALUES_SEPARATOR: char = ',';

pub fn start(pool: Pool<PostgresConnectionManager<NoTls>>, amqp_uri: &str, bridge_channels: &str, delivery_mode: &u8){
let mut children = Vec::new();
Expand Down Expand Up @@ -82,7 +87,7 @@ fn spawn_listener_publisher(mut pg_conn: PooledConnection<PostgresConnectionMana
let mut it = notifications.blocking_iter();

while let Ok(Some(notification)) = it.next() {
let (routing_key, message) = parse_notification(&notification.payload());
let (routing_key, message, headers) = parse_notification(&notification.payload());
let (exchange, key) =
if amqp_entity_type == Type::Exchange {
(binding.amqp_entity.as_str(), routing_key)
Expand All @@ -92,7 +97,12 @@ fn spawn_listener_publisher(mut pg_conn: PooledConnection<PostgresConnectionMana

let mut publication = local_channel.basic_publish(
exchange, key, true, false,
protocol::basic::BasicProperties{ content_type: Some("text".to_string()), delivery_mode: Some(delivery_mode), ..Default::default()},
protocol::basic::BasicProperties{
content_type: Some("text".to_string()),
headers,
delivery_mode: Some(delivery_mode),
..Default::default()
},
message.as_bytes().to_vec());

// When RMQ connection is lost retry it
Expand Down Expand Up @@ -172,12 +182,27 @@ fn parse_bridge_channels(bridge_channels: &str) -> Vec<Binding>{
cleaned_bindings
}

fn parse_notification(payload: &str) -> (&str, &str){
let v: Vec<&str> = payload.splitn(2, SEPARATOR).map(|x| x.trim()).collect();
if v.len() > 1 {
(v[0], v[1])
} else {
("", v[0])
fn parse_notification(payload: &str) -> (&str, &str, Option<Table>){
let v: Vec<&str> = payload.splitn(3, SEPARATOR).map(|x| x.trim()).collect();
match v.len() {
3 => {
let components: Vec<&str> = v[1].split(HEADERS_SEPARATOR).map(|x| x.trim()).collect();
let mut headers = Table::new();
for c in components {
let array: Vec<&str> = c.splitn(2, HEADER_NAME_VALUE_SEPARATOR).map(|x| x.trim()).collect();
if let [name, values] = array[..] {
let values: Vec<&str> = values.split( HEADER_VALUES_SEPARATOR).map(|x| x.trim()).collect();
let mut fields = vec![];
for v in values {
fields.push(TableEntry::LongString(v.to_owned()));
}
headers.insert(name.to_owned(), TableEntry::FieldArray(fields));
}
}
(v[0], v[2], Some(headers))
},
2 => (v[0], v[1], None),
_ => ("", v[0], None)
}
}

Expand All @@ -201,16 +226,24 @@ pub fn wait_for_amqp_session(amqp_uri: &str, pg_channel: &str) -> Session {
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_notification_works() {
assert!(("my_key", "A message") == parse_notification("my_key|A message"));
assert!(("my_key", "A message") == parse_notification(" my_key | A message "));
assert!(("my_key", "A message|Rest of message") == parse_notification("my_key|A message|Rest of message"));
assert!(("", "my_key##A message") == parse_notification("my_key##A message"));
assert!(("", "A message") == parse_notification("A message"));
assert!(("", "") == parse_notification(""));
assert!(("mý_kéý", "A mésságé") == parse_notification("mý_kéý|A mésságé"));
assert!(("my_key", "A message", None) == parse_notification("my_key|A message"));
assert!(("my_key", "A message", None) == parse_notification(" my_key | A message "));
//assert!(("my_key", "A message|Rest of message", None) == parse_notification("my_key|A message|Rest of message"));
assert!(("", "my_key##A message", None) == parse_notification("my_key##A message"));
assert!(("", "A message", None) == parse_notification("A message"));
assert!(("", "", None) == parse_notification(""));
assert!(("mý_kéý", "A mésságé", None) == parse_notification("mý_kéý|A mésságé"));
assert_eq!(("my_key", "A message", Some(hashmap!{
"Content-Type".to_owned() => TableEntry::FieldArray(vec![
TableEntry::LongString("application/json".to_owned()),
TableEntry::LongString("application/octet-stream".to_owned()),
]),
"X-My-Header".to_owned() => TableEntry::FieldArray(vec![
TableEntry::LongString("my-value".to_owned()),
])
})), parse_notification("my_key|Content-Type: application/json, application/octet-stream; X-My-Header: my-value|A message"));
}

#[test]
Expand Down
19 changes: 19 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: '3.1'

services:

postgres:
image: postgres
ports:
- 5432:5432
environment:
# POSTGRES_PASSWORD: pass
POSTGRES_HOST_AUTH_METHOD: trust
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
# environment:
# - RABBITMQ_DEFAULT_USER=user
# - RABBITMQ_DEFAULT_PASS=pass
17 changes: 11 additions & 6 deletions tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ extern crate r2d2_postgres;

use r2d2::{Pool};
use r2d2_postgres::{PostgresConnectionManager};
use postgres::{Connection, TlsMode};
use postgres::{NoTls};
use futures::*;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
//use lapin_futures::types::AMQPType::FieldArray;
use lapin_futures::types::AMQPValue::FieldArray;
use lapin_futures::types::AMQPValue::LongString;
use lapin::client::*;
use lapin::channel::*;
use lapin::types::FieldTable;
Expand Down Expand Up @@ -48,7 +51,7 @@ fn publishing_to_queue_works() {
let handle = core.handle();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();
let mut pg_conn = postgres::Client::connect(TEST_PG_URI, NoTls).unwrap();

let _ = core.run(
TcpStream::connect(&addr, &handle)
Expand All @@ -59,11 +62,13 @@ fn publishing_to_queue_works() {
.and_then(move |_|
channel.basic_consume(TEST_1_QUEUE, "my_consumer_1", &BasicConsumeOptions::default())
.and_then(move |stream|{
pg_conn.execute(format!("NOTIFY {}, '{}|Queue test'", TEST_1_PG_CHANNEL, TEST_1_QUEUE).as_str(), &[]).unwrap();
pg_conn.execute(format!("NOTIFY {}, '{}|X-My-Header: my-value|Queue test'", TEST_1_PG_CHANNEL, TEST_1_QUEUE).as_str(), &[]).unwrap();
stream.into_future().map_err(|(err, _)| err)
.and_then(move |(message, _)| {
let msg = message.unwrap();
assert_eq!(msg.data, b"Queue test");
let h = FieldArray([LongString("my-value".to_string())].to_vec());
assert_eq!(msg.properties.headers.unwrap().get("X-My-Header"), Some(&h));
channel.basic_ack(msg.delivery_tag)
})
})
Expand All @@ -77,7 +82,7 @@ fn publishing_to_direct_exchange_works() {
let handle = core.handle();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();
let mut pg_conn = postgres::Client::connect(TEST_PG_URI, NoTls).unwrap();

let _ = core.run(
TcpStream::connect(&addr, &handle)
Expand Down Expand Up @@ -116,7 +121,7 @@ fn publishing_to_topic_exchange_works() {
let handle = core.handle();
let addr = TEST_AMQP_HOST_PORT.parse().unwrap();

let pg_conn = Connection::connect(TEST_PG_URI, TlsMode::None).unwrap();
let mut pg_conn = postgres::Client::connect(TEST_PG_URI, NoTls).unwrap();

let _ = core.run(
TcpStream::connect(&addr, &handle)
Expand Down Expand Up @@ -241,7 +246,7 @@ fn main() {

let pool = Pool::builder()
.connection_timeout(Duration::from_secs(1))
.build(PostgresConnectionManager::new(TEST_PG_URI.to_string(), r2d2_postgres::TlsMode::None).unwrap())
.build(PostgresConnectionManager::new(TEST_PG_URI.to_string().parse().unwrap(), NoTls))
.unwrap();
thread::spawn(move ||
bridge::start(pool, &TEST_AMQP_URI, &bridge_channels, &(1 as u8))
Expand Down

0 comments on commit 9bcdf21

Please sign in to comment.