diff --git a/cmd/subscribe/subscribe.go b/cmd/subscribe/subscribe.go index 158d4fd..2188005 100644 --- a/cmd/subscribe/subscribe.go +++ b/cmd/subscribe/subscribe.go @@ -7,10 +7,13 @@ import ( globalvar "cli-tool/globalVar" "context" "fmt" + "log" "os" + "os/signal" "time" "github.com/gopcua/opcua" + "github.com/gopcua/opcua/monitor" "github.com/gopcua/opcua/ua" "github.com/spf13/cobra" ) @@ -31,65 +34,70 @@ func getUrl() (string, error) { return string(data), nil } -func sub(nodeId string, url string) (error, value any) { - durationTimeout := time.Duration(timeout) * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), durationTimeout) - defer cancel() +func sub(nodeId string, url string) error { + ctx := context.Background() c, err := opcua.NewClient(url, opcua.SecurityMode(ua.MessageSecurityModeNone)) if err != nil { - return err, nil + return err } if err := c.Connect(ctx); err != nil { - return err, nil + return err } defer c.Close(ctx) - notifyCh := make(chan *opcua.PublishNotificationData) - - sub, err := c.Subscribe(ctx, &opcua.SubscriptionParameters{ - Interval: time.Duration(interval) * time.Millisecond, - }, notifyCh) + m, err := monitor.NewNodeMonitor(c) if err != nil { - return err, nil + return err } - fmt.Printf("Created subscription with id %v", sub.SubscriptionID) + m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) { + log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error()) + }) - id, err := ua.ParseNodeID(nodeID) - if err != nil { - return err, nil - } + go startChanSub(ctx, m, time.Duration(interval), 0, nodeId) + + <-ctx.Done() + return nil +} + +func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, nodes ...string) { + ch := make(chan *monitor.DataChangeMessage, 16) + sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, nodes...) + + cleanChannel := make(chan os.Signal, 1) + signal.Notify(cleanChannel, os.Interrupt) - var miCreateRequest *ua.MonitoredItemCreateRequest + go func() { + <-cleanChannel + log.Printf("unsubscribe %d", sub.SubscriptionID()) + cleanup(ctx, sub) + os.Exit(1) + }() - miCreateRequest = valueRequest(id) - res, err := sub.Monitor(ctx, ua.TimestampsToReturnBoth, miCreateRequest) - if err != nil || res.Results[0].StatusCode != ua.StatusOK { - return err, nil + if err != nil { + log.Fatal(err) } for { select { - case <-ctx.Done(): - return - case res := <-notifyCh: - if res.Error != nil { - return err, nil + case msg := <-ch: + if msg.Error != nil { + log.Printf(" sub=%d error=%s", sub.SubscriptionID(), msg.Error) + } else { + log.Printf(" sub=%d ts=%s node=%s value=%v", sub.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value()) } - - return nil, res.Value + time.Sleep(lag) } } } -func valueRequest(nodeID *ua.NodeID) *ua.MonitoredItemCreateRequest { - handle := uint32(42) - return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle) +func cleanup(ctx context.Context, sub *monitor.Subscription) { + log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped()) + sub.Unsubscribe(ctx) } -// subscribeCmd represents the subscribe command var SubscribeCmd = &cobra.Command{ Use: "subscribe", Short: "Subscribe a node", @@ -98,14 +106,12 @@ var SubscribeCmd = &cobra.Command{ if url, err := getUrl(); err != nil { fmt.Println(err) } else { - fmt.Println(fmt.Sprintf("Entered Node-ID: %s", nodeID)) - if err, resp := sub(nodeID, url); err != nil { + if err := sub(nodeID, url); err != nil { fmt.Println(err) - } else { - fmt.Println(resp) } + } }, }