From f16b6e0f7b55017dbd9f272fe2d45486073aa6a3 Mon Sep 17 00:00:00 2001 From: lspgn Date: Fri, 8 Dec 2023 20:44:06 -0800 Subject: [PATCH] batch error for decoder (template warnings) --- cmd/goflow2/main.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 762be563..e657fe81 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -279,6 +279,8 @@ func main() { decodeFunc = metrics.PromDecoderWrapper(p.DecodeFlow, listenAddrUrl.Scheme) pipes = append(pipes, p) + bm := utils.NewBatchMute(*ErrInt, *ErrCnt) + // starts receivers // the function either returns an error if err := recv.Start(hostname, int(port), decodeFunc); err != nil { @@ -293,13 +295,25 @@ func main() { case <-q: return case err := <-recv.Errors(): - l := l.WithError(err) - if errors.Is(err, netflow.ErrorTemplateNotFound) { - l.Warn("template error") - } else if errors.Is(err, net.ErrClosed) { + + if errors.Is(err, net.ErrClosed) { l.Info("closed receiver") - } else { + continue + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) { l.Error("error") + continue + } + + muted, skipped := bm.Increment() + if muted && skipped == 0 { + log.Warn("too many receiver messages, muting") + } else if !muted && skipped > 0 { + log.Warnf("skipped %d receiver messages", skipped) + } else if !muted { + l := l.WithError(err) + if errors.Is(err, netflow.ErrorTemplateNotFound) { + l.Warn("template error") + } } }