From 644ef561029296635a92a09a6009eeaee3b761f5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 10 Apr 2024 23:07:48 +0530 Subject: [PATCH 1/2] feat: MQTT v5 `Auth` packet serde --- rumqttc/src/v5/mqttbytes/v5/auth.rs | 202 ++++++++++++++++++++++++++++ rumqttc/src/v5/mqttbytes/v5/mod.rs | 5 + 2 files changed, 207 insertions(+) create mode 100644 rumqttc/src/v5/mqttbytes/v5/auth.rs diff --git a/rumqttc/src/v5/mqttbytes/v5/auth.rs b/rumqttc/src/v5/mqttbytes/v5/auth.rs new file mode 100644 index 000000000..52ff62849 --- /dev/null +++ b/rumqttc/src/v5/mqttbytes/v5/auth.rs @@ -0,0 +1,202 @@ +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +use super::{ + len_len, length, property, read_mqtt_bytes, read_mqtt_string, read_u8, write_mqtt_bytes, + write_mqtt_string, write_remaining_length, Error, FixedHeader, PropertyType, +}; + +/// Auth packet reason code +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AuthReasonCode { + Success, + Continue, + ReAuthentivate, +} + +impl AuthReasonCode { + fn read(bytes: &mut Bytes) -> Result { + let reason_code = read_u8(bytes)?; + let code = match reason_code { + 0x00 => AuthReasonCode::Success, + 0x18 => AuthReasonCode::Continue, + 0x19 => AuthReasonCode::ReAuthentivate, + _ => return Err(Error::MalformedPacket), + }; + + Ok(code) + } + + fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> { + let reason_code = match self { + AuthReasonCode::Success => 0x00, + AuthReasonCode::Continue => 0x18, + AuthReasonCode::ReAuthentivate => 0x19, + }; + + buffer.put_u8(reason_code); + + Ok(()) + } +} + +/// Used to perform extended authentication exchange +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Auth { + pub code: AuthReasonCode, + pub properties: Option, +} + +impl Auth { + fn len(&self) -> usize { + let mut len = 1 // reason code + + 1; // property len + + if let Some(properties) = &self.properties { + len += properties.len(); + } + + len + } + + pub fn size(&self) -> usize { + let len = self.len(); + let remaining_len_size = len_len(len); + + 1 + remaining_len_size + len + } + + pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result { + let variable_header_index = fixed_header.fixed_header_len; + bytes.advance(variable_header_index); + + let code = AuthReasonCode::read(&mut bytes)?; + let properties = AuthProperties::read(&mut bytes)?; + let auth = Auth { code, properties }; + + Ok(auth) + } + + pub fn write(&self, buffer: &mut BytesMut) -> Result { + buffer.put_u8(0xF0); + + let len = self.len(); + let count = write_remaining_length(buffer, len)?; + + self.code.write(buffer)?; + if let Some(p) = &self.properties { + p.write(buffer)?; + } else { + write_remaining_length(buffer, 0)?; + } + + Ok(1 + count + len) + } +} + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct AuthProperties { + pub method: Option, + pub data: Option, + pub reason: Option, + pub user_properties: Vec<(String, String)>, +} + +impl AuthProperties { + fn len(&self) -> usize { + let mut len = 0; + + if let Some(method) = &self.method { + let m_len = method.len(); + len += 1 + m_len; + } + + if let Some(data) = &self.data { + let d_len = data.len(); + len += 1 + len_len(d_len) + d_len; + } + + if let Some(reason) = &self.reason { + let r_len = reason.len(); + len += 1 + r_len; + } + + for (key, value) in self.user_properties.iter() { + let p_len = key.len() + value.len(); + len += 1 + p_len; + } + + len + } + + pub fn read(bytes: &mut Bytes) -> Result, Error> { + let (properties_len_len, properties_len) = length(bytes.iter())?; + bytes.advance(properties_len_len); + if properties_len == 0 { + return Ok(None); + } + + let mut props = AuthProperties::default(); + + let mut cursor = 0; + // read until cursor reaches property length. properties_len = 0 will skip this loop + while cursor < properties_len { + let prop = read_u8(bytes)?; + cursor += 1; + + match property(prop)? { + PropertyType::AuthenticationMethod => { + let method = read_mqtt_string(bytes)?; + cursor += method.len(); + props.method = Some(method); + } + PropertyType::AuthenticationData => { + let data = read_mqtt_bytes(bytes)?; + cursor += 2 + data.len(); + props.data = Some(data); + } + PropertyType::ReasonString => { + let reason = read_mqtt_string(bytes)?; + cursor += reason.len(); + props.reason = Some(reason); + } + PropertyType::UserProperty => { + let key = read_mqtt_string(bytes)?; + let value = read_mqtt_string(bytes)?; + cursor += 2 + key.len() + 2 + value.len(); + props.user_properties.push((key, value)); + } + _ => return Err(Error::InvalidPropertyType(prop)), + } + } + + Ok(Some(props)) + } + + pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> { + let len = self.len(); + write_remaining_length(buffer, len)?; + + if let Some(authentication_method) = &self.method { + buffer.put_u8(PropertyType::AuthenticationMethod as u8); + write_mqtt_string(buffer, authentication_method); + } + + if let Some(authentication_data) = &self.data { + buffer.put_u8(PropertyType::AuthenticationData as u8); + write_mqtt_bytes(buffer, authentication_data); + } + + if let Some(reason) = &self.reason { + buffer.put_u8(PropertyType::ReasonString as u8); + write_mqtt_string(buffer, reason); + } + + for (key, value) in self.user_properties.iter() { + buffer.put_u8(PropertyType::UserProperty as u8); + write_mqtt_string(buffer, key); + write_mqtt_string(buffer, value); + } + + Ok(()) + } +} diff --git a/rumqttc/src/v5/mqttbytes/v5/mod.rs b/rumqttc/src/v5/mqttbytes/v5/mod.rs index 342278596..82b2e8908 100644 --- a/rumqttc/src/v5/mqttbytes/v5/mod.rs +++ b/rumqttc/src/v5/mqttbytes/v5/mod.rs @@ -1,6 +1,7 @@ use std::slice::Iter; pub use self::{ + auth::Auth, codec::Codec, connack::{ConnAck, ConnAckProperties, ConnectReturnCode}, connect::{Connect, ConnectProperties, LastWill, LastWillProperties, Login}, @@ -20,6 +21,7 @@ pub use self::{ use super::*; use bytes::{Buf, BufMut, Bytes, BytesMut}; +mod auth; mod codec; mod connack; mod connect; @@ -37,6 +39,7 @@ mod unsubscribe; #[derive(Clone, Debug, PartialEq, Eq)] pub enum Packet { + Auth(Auth), Connect(Connect, Option, Option), ConnAck(ConnAck), Publish(Publish), @@ -139,6 +142,7 @@ impl Packet { } match self { + Self::Auth(auth) => auth.write(write), Self::Publish(publish) => publish.write(write), Self::Subscribe(subscription) => subscription.write(write), Self::Unsubscribe(unsubscribe) => unsubscribe.write(write), @@ -158,6 +162,7 @@ impl Packet { pub fn size(&self) -> usize { match self { + Self::Auth(auth) => auth.size(), Self::Publish(publish) => publish.size(), Self::Subscribe(subscription) => subscription.size(), Self::Unsubscribe(unsubscribe) => unsubscribe.size(), From 378c8b8cda827c8561646cc51dd7896e5ea0405a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 17 May 2024 13:01:29 +0000 Subject: [PATCH 2/2] doc: CHANGELOG entry --- rumqttc/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 1045cfcf1..494bd5191 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `size()` method on `Packet` calculates size once serialized. * `read()` and `write()` methods on `Packet`. * `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection +* `Auth` packet as per MQTT5 standards ### Changed