Skip to content

Commit

Permalink
Merge pull request #26 from karthik2804/add-topic-name
Browse files Browse the repository at this point in the history
add topic name metadata calling the guest handler
  • Loading branch information
suneetnangia authored Apr 16, 2024
2 parents a8e0955 + ed6e1c9 commit d83411f
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 16 deletions.
9 changes: 5 additions & 4 deletions examples/mqtt-app/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use chrono::{DateTime, Utc};
use spin_mqtt_sdk::{mqtt_component, Payload};
use spin_mqtt_sdk::{mqtt_component, Metadata, Payload};

#[mqtt_component]
async fn handle_message(message: Payload) -> anyhow::Result<()> {
async fn handle_message(message: Payload, metadata: Metadata) -> anyhow::Result<()> {
let datetime: DateTime<Utc> = std::time::SystemTime::now().into();
let formatted_time = datetime.format("%Y-%m-%d %H:%M:%S.%f").to_string();

println!(
"{:?} Message received by wasm component: '{}'",
"{:?} Message received by wasm component: '{}' on topic '{}'",
formatted_time,
String::from_utf8_lossy(&message)
String::from_utf8_lossy(&message),
metadata.topic
);
Ok(())
}
4 changes: 2 additions & 2 deletions sdk/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub fn mqtt_component(_attr: TokenStream, item: TokenStream) -> TokenStream {
#preamble
}
impl self::preamble::Guest for preamble::Mqtt {
fn handle_message(payload: ::spin_mqtt_sdk::Payload) -> ::std::result::Result<(), ::spin_mqtt_sdk::Error> {
fn handle_message(payload: ::spin_mqtt_sdk::Payload, metadata: ::spin_mqtt_sdk::Metadata) -> ::std::result::Result<(), ::spin_mqtt_sdk::Error> {
::spin_mqtt_sdk::executor::run(async move {
match super::#func_name(payload)#await_postfix {
match super::#func_name(payload, metadata)#await_postfix {
::std::result::Result::Ok(()) => ::std::result::Result::Ok(()),
::std::result::Result::Err(e) => {
eprintln!("{}", e);
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pub mod wit {
pub use wit_bindgen;

#[doc(inline)]
pub use wit::spin::mqtt_trigger::spin_mqtt_types::{Error, Payload};
pub use wit::spin::mqtt_trigger::spin_mqtt_types::{Error, Metadata, Payload};
9 changes: 7 additions & 2 deletions spin-mqtt.wit
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ interface spin-mqtt-types {
exactly-once,
}

// metadata associated with the payload
record metadata {
topic: string,
}

/// The message payload.
type payload = list<u8>;
}

world spin-mqtt {
use spin-mqtt-types.{error, payload};
use spin-mqtt-types.{error, metadata, payload};

/// The entrypoint for a Mqtt handler in wasm component
export handle-message: func(message: payload) -> result<_, error>;
export handle-message: func(message: payload, metadata: metadata) -> result<_, error>;
}

world spin-mqtt-sdk {
Expand Down
19 changes: 15 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ wasmtime::component::bindgen!({
async: true,
});

use spin::mqtt_trigger::spin_mqtt_types as mqtt_types;

pub(crate) type RuntimeData = ();
pub(crate) type _Store = spin_core::Store<RuntimeData>;

Expand Down Expand Up @@ -115,7 +117,7 @@ impl TriggerExecutor for MqttTrigger {
async fn run(self, config: Self::RunConfig) -> anyhow::Result<()> {
if config.test {
for component in &self.component_configs {
self.handle_mqtt_event(&component.0, b"test message".to_vec())
self.handle_mqtt_event(&component.0, b"test message".to_vec(), "test".to_string())
.await?;
}

Expand Down Expand Up @@ -155,15 +157,20 @@ impl TriggerExecutor for MqttTrigger {
}

impl MqttTrigger {
async fn handle_mqtt_event(&self, component_id: &str, message: Vec<u8>) -> anyhow::Result<()> {
async fn handle_mqtt_event(
&self,
component_id: &str,
message: Vec<u8>,
topic: String,
) -> anyhow::Result<()> {
// Load the guest wasm component
let (instance, mut store) = self.engine.prepare_instance(component_id).await?;

// SpinMqtt is auto generated by bindgen as per WIT files referenced above.
let instance = SpinMqtt::new(&mut store, &instance)?;

instance
.call_handle_message(store, &message)
.call_handle_message(store, &message, &mqtt_types::Metadata { topic })
.await?
.map_err(|err| anyhow!("failed to execute guest: {err}"))
}
Expand Down Expand Up @@ -199,7 +206,11 @@ impl MqttTrigger {
Ok(Some(msg)) => {
// Handle the received message
_ = self
.handle_mqtt_event(&component_id, msg.payload().to_vec())
.handle_mqtt_event(
&component_id,
msg.payload().to_vec(),
msg.topic().to_owned(),
)
.await
.map_err(|err| tracing::error!("{err}"));
}
Expand Down
6 changes: 3 additions & 3 deletions templates/mqtt-rust/content/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use spin_mqtt_sdk::{mqtt_component, Payload};
use chrono::{DateTime, Utc};
use spin_mqtt_sdk::{mqtt_component, Metadata, Payload};

#[mqtt_component]
fn handle_message(message: Payload) -> anyhow::Result<()> {
async fn handle_message(message: Payload, metadata: Metadata) -> anyhow::Result<()> {
let datetime: DateTime<Utc> = std::time::SystemTime::now().into();
let formatted_time = datetime.format("%Y-%m-%d %H:%M:%S.%f").to_string();

Expand All @@ -12,4 +12,4 @@ fn handle_message(message: Payload) -> anyhow::Result<()> {
String::from_utf8_lossy(&message)
);
Ok(())
}
}

0 comments on commit d83411f

Please sign in to comment.