Skip to content

Commit

Permalink
modified: cmd/subscribe/subscribe.go
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi613 committed Nov 6, 2023
1 parent 827cf88 commit f932ba7
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions cmd/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
globalvar "cli-tool/globalVar"
"context"
"fmt"
"log"
"os"
"os/signal"
"time"

"github.com/gopcua/opcua"
"github.com/gopcua/opcua/monitor"
"github.com/gopcua/opcua/ua"
"github.com/spf13/cobra"
)
Expand All @@ -31,65 +34,70 @@ func getUrl() (string, error) {
return string(data), nil
}

func sub(nodeId string, url string) (error, value any) {
durationTimeout := time.Duration(timeout) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), durationTimeout)
defer cancel()
func sub(nodeId string, url string) error {
ctx := context.Background()
c, err := opcua.NewClient(url, opcua.SecurityMode(ua.MessageSecurityModeNone))

if err != nil {
return err, nil
return err
}
if err := c.Connect(ctx); err != nil {
return err, nil
return err
}

defer c.Close(ctx)

notifyCh := make(chan *opcua.PublishNotificationData)

sub, err := c.Subscribe(ctx, &opcua.SubscriptionParameters{
Interval: time.Duration(interval) * time.Millisecond,
}, notifyCh)
m, err := monitor.NewNodeMonitor(c)
if err != nil {
return err, nil
return err
}

fmt.Printf("Created subscription with id %v", sub.SubscriptionID)
m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error())
})

id, err := ua.ParseNodeID(nodeID)
if err != nil {
return err, nil
}
go startChanSub(ctx, m, time.Duration(interval), 0, nodeId)

<-ctx.Done()
return nil
}

func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, nodes ...string) {
ch := make(chan *monitor.DataChangeMessage, 16)
sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, nodes...)

cleanChannel := make(chan os.Signal, 1)
signal.Notify(cleanChannel, os.Interrupt)

var miCreateRequest *ua.MonitoredItemCreateRequest
go func() {
<-cleanChannel
log.Printf("unsubscribe %d", sub.SubscriptionID())
cleanup(ctx, sub)
os.Exit(1)
}()

miCreateRequest = valueRequest(id)
res, err := sub.Monitor(ctx, ua.TimestampsToReturnBoth, miCreateRequest)
if err != nil || res.Results[0].StatusCode != ua.StatusOK {
return err, nil
if err != nil {
log.Fatal(err)
}

for {
select {
case <-ctx.Done():
return
case res := <-notifyCh:
if res.Error != nil {
return err, nil
case msg := <-ch:
if msg.Error != nil {
log.Printf(" sub=%d error=%s", sub.SubscriptionID(), msg.Error)
} else {
log.Printf(" sub=%d ts=%s node=%s value=%v", sub.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value())
}

return nil, res.Value
time.Sleep(lag)
}
}
}

func valueRequest(nodeID *ua.NodeID) *ua.MonitoredItemCreateRequest {
handle := uint32(42)
return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle)
func cleanup(ctx context.Context, sub *monitor.Subscription) {
log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped())
sub.Unsubscribe(ctx)
}

// subscribeCmd represents the subscribe command
var SubscribeCmd = &cobra.Command{
Use: "subscribe",
Short: "Subscribe a node",
Expand All @@ -98,14 +106,12 @@ var SubscribeCmd = &cobra.Command{
if url, err := getUrl(); err != nil {
fmt.Println(err)
} else {

fmt.Println(fmt.Sprintf("Entered Node-ID: %s", nodeID))

if err, resp := sub(nodeID, url); err != nil {
if err := sub(nodeID, url); err != nil {
fmt.Println(err)
} else {
fmt.Println(resp)
}

}
},
}
Expand Down

0 comments on commit f932ba7

Please sign in to comment.