Skip to content

Commit

Permalink
Merge pull request #5059 from weichou1229/purge-events
Browse files Browse the repository at this point in the history
feat: Allow purging events
  • Loading branch information
cloudxxx8 authored Jan 20, 2025
2 parents fdc408d + 464389f commit 74c8cdb
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/core-data/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Writable:
ReadingsPersisted: false
# Tags: # Contains the service level tags to be attached to all the service's metrics
## Gateway="my-iot-gateway" # Tag must be added here or via Consul Env Override can only change existing value, not added new ones.
EventPurge: false # Remove the related events and readings once received the device deletion system event

Service:
Port: 59880
Expand Down
1 change: 1 addition & 0 deletions internal/core/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type WritableInfo struct {
LogLevel string
InsecureSecrets bootstrapConfig.InsecureSecrets
Telemetry bootstrapConfig.TelemetryInfo
EventPurge bool
}

type ReadingRetention struct {
Expand Down
101 changes: 101 additions & 0 deletions internal/core/data/controller/messaging/system_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"context"
"encoding/json"
"fmt"

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/di"
"github.com/edgexfoundry/go-mod-core-contracts/v4/common"
"github.com/edgexfoundry/go-mod-core-contracts/v4/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v4/errors"
"github.com/edgexfoundry/go-mod-messaging/v4/pkg/types"
)

func SubscribeSystemEvents(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := dataContainer.ConfigurationFrom(dic.Get)
messageBusInfo := dataContainer.ConfigurationFrom(dic.Get).MessageBus

// device deletion event edgex/system-events/core-metadata/device/delete/<device name>/<device profile name>
deviceDeletionSystemEventTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.SystemEventPublishTopic).SetPath(common.CoreMetaDataServiceKey).
SetPath(common.DeviceSystemEventType).SetPath(common.SystemEventActionDelete).SetPath("#").BuildPath()
lc.Infof("Subscribing to System Events on topic: %s", deviceDeletionSystemEventTopic)

messages := make(chan types.MessageEnvelope, 1)
messageErrors := make(chan error, 1)
topics := []types.TopicChannel{
{
Topic: deviceDeletionSystemEventTopic,
Messages: messages,
},
}

messageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err := messageBus.Subscribe(topics, messageErrors)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

go func() {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", deviceDeletionSystemEventTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID)
var systemEvent dtos.SystemEvent
err := json.Unmarshal(msgEnvelope.Payload, &systemEvent)
if err != nil {
lc.Errorf("failed to JSON decoding system event: %s", err.Error())
continue
}

switch systemEvent.Type {
case common.DeviceSystemEventType:
err = deviceSystemEventAction(systemEvent, dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
}
}
}
}()

return nil
}

func deviceSystemEventAction(systemEvent dtos.SystemEvent, dic *di.Container) error {
var device dtos.Device
err := systemEvent.DecodeDetails(&device)
if err != nil {
return fmt.Errorf("failed to decode %s system event details: %s", systemEvent.Type, err.Error())
}

lc := bootstrapContainer.LoggingClientFrom(dic.Get)
switch systemEvent.Action {
case common.SystemEventActionDelete:
if !dataContainer.ConfigurationFrom(dic.Get).Writable.EventPurge {
return nil
}
lc.Debugf("Device '%s' is deleted, try to remove related events and readings.", device.Name)
err = application.CoreDataAppFrom(dic.Get).DeleteEventsByDeviceName(device.Name, dic)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
lc.Debugf("Events and readings are removed for the Device '%s'.", device.Name)
}
return nil
}
5 changes: 5 additions & 0 deletions internal/core/data/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, st
lc.Errorf("Failed to subscribe events from message bus, %v", err)
return false
}
err = messaging.SubscribeSystemEvents(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe system events from message bus, %v", err)
return false
}

config := container.ConfigurationFrom(dic.Get)
if config.Retention.Enabled {
Expand Down

0 comments on commit 74c8cdb

Please sign in to comment.