From 464389f2b4c69318f66adc0d56ad4bda5a15fa05 Mon Sep 17 00:00:00 2001 From: bruce Date: Fri, 17 Jan 2025 15:22:21 +0800 Subject: [PATCH] feat: Allow purging events To purge the redundant events and readings, remove the related events and readings once received the device deletion system event. Signed-off-by: bruce (cherry picked from commit 481bac23ae2783e5fdad02be5b4ce3f7b675fd66) --- cmd/core-data/res/configuration.yaml | 1 + internal/core/data/config/config.go | 1 + .../data/controller/messaging/system_event.go | 101 ++++++++++++++++++ internal/core/data/init.go | 5 + 4 files changed, 108 insertions(+) create mode 100644 internal/core/data/controller/messaging/system_event.go diff --git a/cmd/core-data/res/configuration.yaml b/cmd/core-data/res/configuration.yaml index 39be5bb4b8..7ebceed2d7 100644 --- a/cmd/core-data/res/configuration.yaml +++ b/cmd/core-data/res/configuration.yaml @@ -8,6 +8,7 @@ Writable: ReadingsPersisted: false # Tags: # Contains the service level tags to be attached to all the service's metrics ## Gateway="my-iot-gateway" # Tag must be added here or via Consul Env Override can only change existing value, not added new ones. + EventPurge: false # Remove the related events and readings once received the device deletion system event Service: Port: 59880 diff --git a/internal/core/data/config/config.go b/internal/core/data/config/config.go index f176be6720..acd3f8207b 100644 --- a/internal/core/data/config/config.go +++ b/internal/core/data/config/config.go @@ -34,6 +34,7 @@ type WritableInfo struct { LogLevel string InsecureSecrets bootstrapConfig.InsecureSecrets Telemetry bootstrapConfig.TelemetryInfo + EventPurge bool } type ReadingRetention struct { diff --git a/internal/core/data/controller/messaging/system_event.go b/internal/core/data/controller/messaging/system_event.go new file mode 100644 index 0000000000..7bd9817a59 --- /dev/null +++ b/internal/core/data/controller/messaging/system_event.go @@ -0,0 +1,101 @@ +// +// Copyright (C) 2025 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/edgexfoundry/edgex-go/internal/core/data/application" + dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v4/di" + "github.com/edgexfoundry/go-mod-core-contracts/v4/common" + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos" + "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" + "github.com/edgexfoundry/go-mod-messaging/v4/pkg/types" +) + +func SubscribeSystemEvents(ctx context.Context, dic *di.Container) errors.EdgeX { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + configuration := dataContainer.ConfigurationFrom(dic.Get) + messageBusInfo := dataContainer.ConfigurationFrom(dic.Get).MessageBus + + // device deletion event edgex/system-events/core-metadata/device/delete// + deviceDeletionSystemEventTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.SystemEventPublishTopic).SetPath(common.CoreMetaDataServiceKey). + SetPath(common.DeviceSystemEventType).SetPath(common.SystemEventActionDelete).SetPath("#").BuildPath() + lc.Infof("Subscribing to System Events on topic: %s", deviceDeletionSystemEventTopic) + + messages := make(chan types.MessageEnvelope, 1) + messageErrors := make(chan error, 1) + topics := []types.TopicChannel{ + { + Topic: deviceDeletionSystemEventTopic, + Messages: messages, + }, + } + + messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + err := messageBus.Subscribe(topics, messageErrors) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + + go func() { + for { + select { + case <-ctx.Done(): + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", deviceDeletionSystemEventTopic) + return + case err = <-messageErrors: + lc.Error(err.Error()) + case msgEnvelope := <-messages: + lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID) + var systemEvent dtos.SystemEvent + err := json.Unmarshal(msgEnvelope.Payload, &systemEvent) + if err != nil { + lc.Errorf("failed to JSON decoding system event: %s", err.Error()) + continue + } + + switch systemEvent.Type { + case common.DeviceSystemEventType: + err = deviceSystemEventAction(systemEvent, dic) + if err != nil { + lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID) + } + } + } + } + }() + + return nil +} + +func deviceSystemEventAction(systemEvent dtos.SystemEvent, dic *di.Container) error { + var device dtos.Device + err := systemEvent.DecodeDetails(&device) + if err != nil { + return fmt.Errorf("failed to decode %s system event details: %s", systemEvent.Type, err.Error()) + } + + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + switch systemEvent.Action { + case common.SystemEventActionDelete: + if !dataContainer.ConfigurationFrom(dic.Get).Writable.EventPurge { + return nil + } + lc.Debugf("Device '%s' is deleted, try to remove related events and readings.", device.Name) + err = application.CoreDataAppFrom(dic.Get).DeleteEventsByDeviceName(device.Name, dic) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + lc.Debugf("Events and readings are removed for the Device '%s'.", device.Name) + } + return nil +} diff --git a/internal/core/data/init.go b/internal/core/data/init.go index 03d8d5d154..6ea8adf1d5 100644 --- a/internal/core/data/init.go +++ b/internal/core/data/init.go @@ -55,6 +55,11 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, st lc.Errorf("Failed to subscribe events from message bus, %v", err) return false } + err = messaging.SubscribeSystemEvents(ctx, dic) + if err != nil { + lc.Errorf("Failed to subscribe system events from message bus, %v", err) + return false + } config := container.ConfigurationFrom(dic.Get) if config.Retention.Enabled {