Skip to content

Commit

Permalink
feat: REST API to update entity twin data
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Mar 3, 2025
1 parent e85f7f2 commit e885219
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 11 deletions.
50 changes: 50 additions & 0 deletions crates/core/tedge_agent/src/entity_manager/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;
use tedge_actors::LoggingSender;
use tedge_actors::MessageSink;
Expand All @@ -7,6 +10,7 @@ use tedge_actors::Server;
use tedge_api::entity::EntityMetadata;
use tedge_api::entity_store;
use tedge_api::entity_store::EntityRegistrationMessage;
use tedge_api::entity_store::EntityTwinMessage;
use tedge_api::entity_store::ListFilters;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
Expand All @@ -22,6 +26,7 @@ use tracing::error;
pub enum EntityStoreRequest {
Get(EntityTopicId),
Create(EntityRegistrationMessage),
Patch(EntityTwinData),
Delete(EntityTopicId),
List(ListFilters),
MqttMessage(MqttMessage),
Expand All @@ -31,11 +36,40 @@ pub enum EntityStoreRequest {
pub enum EntityStoreResponse {
Get(Option<EntityMetadata>),
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
Patch(Result<(), entity_store::Error>),
Delete(Vec<EntityMetadata>),
List(Vec<EntityMetadata>),
Ok,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EntityTwinData {
pub topic_id: EntityTopicId,
#[serde(flatten)]
pub fragments: Map<String, Value>,
}

impl EntityTwinData {
pub fn try_new(
topic_id: EntityTopicId,
twin_data: Map<String, Value>,
) -> Result<Self, InvalidTwinData> {
for key in twin_data.keys() {
if key.starts_with('@') {
return Err(InvalidTwinData);
}
}
Ok(Self {
topic_id,
fragments: twin_data,
})
}
}

#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
#[error("Fragment keys starting with '@' are not allowed as twin data")]
pub struct InvalidTwinData;

pub struct EntityStoreServer {
entity_store: EntityStore,
mqtt_schema: MqttSchema,
Expand Down Expand Up @@ -90,6 +124,10 @@ impl Server for EntityStoreServer {
let res = self.register_entity(entity).await;
EntityStoreResponse::Create(res)
}
EntityStoreRequest::Patch(twin_data) => {
let res = self.patch_entity(twin_data).await;
EntityStoreResponse::Patch(res)
}
EntityStoreRequest::Delete(topic_id) => {
let deleted_entities = self.deregister_entity(topic_id).await;
EntityStoreResponse::Delete(deleted_entities)
Expand Down Expand Up @@ -218,6 +256,18 @@ impl EntityStoreServer {
Ok(registered)
}

async fn patch_entity(&mut self, twin_data: EntityTwinData) -> Result<(), entity_store::Error> {
for (fragment_key, fragment_value) in twin_data.fragments.into_iter() {
self.entity_store.update_twin_data(EntityTwinMessage::new(
twin_data.topic_id.clone(),
fragment_key,
fragment_value,
))?;
}

Ok(())
}

async fn deregister_entity(&mut self, topic_id: EntityTopicId) -> Vec<EntityMetadata> {
let deleted = self.entity_store.deregister_entity(&topic_id);
for entity in deleted.iter() {
Expand Down
160 changes: 157 additions & 3 deletions crates/core/tedge_agent/src/http_server/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use super::server::AgentState;
use crate::entity_manager::server::EntityStoreRequest;
use crate::entity_manager::server::EntityStoreResponse;
use crate::entity_manager::server::EntityTwinData;
use crate::entity_manager::server::InvalidTwinData;
use axum::extract::Path;
use axum::extract::Query;
use axum::extract::State;
Expand All @@ -23,6 +25,8 @@ use axum::Router;
use hyper::StatusCode;
use serde::Deserialize;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use std::str::FromStr;
use tedge_api::entity::EntityMetadata;
use tedge_api::entity::InvalidEntityType;
Expand Down Expand Up @@ -96,7 +100,7 @@ enum Error {
#[error(transparent)]
EntityStoreError(#[from] entity_store::Error),

#[error("Entity not found with topic id: {0}")]
#[error("Entity with topic id: {0} not found")]
EntityNotFound(EntityTopicId),

#[allow(clippy::enum_variant_names)]
Expand All @@ -108,6 +112,9 @@ enum Error {

#[error(transparent)]
InvalidInput(#[from] InputValidationError),

#[error(transparent)]
InvalidTwinData(#[from] InvalidTwinData),
}

impl IntoResponse for Error {
Expand All @@ -123,6 +130,7 @@ impl IntoResponse for Error {
Error::ChannelError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Error::InvalidEntityStoreResponse => StatusCode::INTERNAL_SERVER_ERROR,
Error::InvalidInput(_) => StatusCode::BAD_REQUEST,
Error::InvalidTwinData(_) => StatusCode::BAD_REQUEST,
};
let error_message = self.to_string();

Expand All @@ -135,7 +143,9 @@ pub(crate) fn entity_store_router(state: AgentState) -> Router {
.route("/v1/entities", post(register_entity).get(list_entities))
.route(
"/v1/entities/{*path}",
get(get_entity).delete(deregister_entity),
get(get_entity)
.patch(patch_entity)
.delete(deregister_entity),
)
.with_state(state)
}
Expand All @@ -160,6 +170,29 @@ async fn register_entity(
))
}

async fn patch_entity(
State(state): State<AgentState>,
Path(path): Path<String>,
Json(twin_fragments): Json<Map<String, Value>>,
) -> impl IntoResponse {
let topic_id = EntityTopicId::from_str(&path)?;
let twin_data = EntityTwinData::try_new(topic_id, twin_fragments)?;

let response = state
.entity_store_handle
.clone()
.await_response(EntityStoreRequest::Patch(twin_data))
.await?;
let EntityStoreResponse::Patch(res) = response else {
return Err(Error::InvalidEntityStoreResponse);
};
res?;

let entity = get_entity(State(state), Path(path)).await?;

Ok(entity)
}

async fn get_entity(
State(state): State<AgentState>,
Path(path): Path<String>,
Expand Down Expand Up @@ -328,7 +361,7 @@ mod tests {
let entity: Value = serde_json::from_slice(&body).unwrap();
assert_json_eq!(
entity,
json!( {"error":"Entity not found with topic id: device/test-child//"})
json!({"error":"Entity with topic id: device/test-child// not found"})
);
}

Expand Down Expand Up @@ -483,6 +516,127 @@ mod tests {
);
}

#[tokio::test]
async fn entity_patch() {
let TestHandle {
mut app,
mut entity_store_box,
} = setup();

// Mock entity store actor response for patch
tokio::spawn(async move {
while let Some(mut req) = entity_store_box.recv().await {
if let EntityStoreRequest::Patch(twin_data) = req.request {
if twin_data.topic_id
== EntityTopicId::default_child_device("test-child").unwrap()
{
req.reply_to
.send(EntityStoreResponse::Patch(Ok(())))
.await
.unwrap();
}
} else if let EntityStoreRequest::Get(topic_id) = req.request {
if topic_id == EntityTopicId::default_child_device("test-child").unwrap() {
let mut entity =
EntityMetadata::child_device("test-child".to_string()).unwrap();
entity.twin_data.insert("foo".to_string(), json!("bar"));

req.reply_to
.send(EntityStoreResponse::Get(Some(entity)))
.await
.unwrap();
}
}
}
});

let twin_payload = json!({"foo": "bar"}).to_string();

let req = Request::builder()
.method(Method::PATCH)
.uri("/v1/entities/device/test-child//")
.header("Content-Type", "application/json")
.body(Body::from(twin_payload))
.expect("request builder");

let response = app.call(req).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

let body = response.into_body().collect().await.unwrap().to_bytes();
let entity: EntityMetadata = serde_json::from_slice(&body).unwrap();
assert_eq!(entity.twin_data.get("foo"), Some(&json!("bar")));
}

#[tokio::test]
async fn entity_patch_invalid_key() {
let TestHandle {
mut app,
entity_store_box: _, // Not used
} = setup();

let req = Request::builder()
.method(Method::PATCH)
.uri("/v1/entities/device/test-child//")
.header("Content-Type", "application/json")
.body(Body::from(r#"{"@id": "new-id"}"#))
.expect("request builder");

let response = app.call(req).await.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);

let body = response.into_body().collect().await.unwrap().to_bytes();
let entity: Value = serde_json::from_slice(&body).unwrap();
assert_json_eq!(
entity,
json!({"error":"Fragment keys starting with '@' are not allowed as twin data"})
);
}

#[tokio::test]
async fn patch_unknown_entity() {
let TestHandle {
mut app,
mut entity_store_box,
} = setup();

// Mock entity store actor response
tokio::spawn(async move {
if let Some(mut req) = entity_store_box.recv().await {
if let EntityStoreRequest::Patch(twin_data) = req.request {
if twin_data.topic_id
== EntityTopicId::default_child_device("test-child").unwrap()
{
req.reply_to
.send(EntityStoreResponse::Patch(Err(
entity_store::Error::UnknownEntity(
"device/test-child//".to_string(),
),
)))
.await
.unwrap();
}
}
}
});

let req = Request::builder()
.method(Method::PATCH)
.uri("/v1/entities/device/test-child//")
.header("Content-Type", "application/json")
.body(Body::from(r#"{"foo": "bar"}"#))
.expect("request builder");

let response = app.call(req).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);

let body = response.into_body().collect().await.unwrap().to_bytes();
let entity: Value = serde_json::from_slice(&body).unwrap();
assert_json_eq!(
entity,
json!({"error":"The specified entity: device/test-child// does not exist in the store"})
);
}

#[tokio::test]
async fn entity_delete() {
let TestHandle {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ pub enum Error {
#[error("An entity with topic id: {0} is already registered")]
EntityAlreadyRegistered(EntityTopicId),

#[error("The specified entity {0} does not exist in the store")]
#[error("The specified entity: {0} does not exist in the store")]
UnknownEntity(String),

#[error("Auto registration of the entity with topic id {0} failed as it does not match the default topic scheme: 'device/<device-id>/service/<service-id>'. Try explicit registration instead.")]
Expand Down
Loading

0 comments on commit e885219

Please sign in to comment.