diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 8ad99156..cc994eea 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -39,6 +39,7 @@ import ( // core libraries "github.com/netsampler/goflow2/v2/metrics" "github.com/netsampler/goflow2/v2/utils" + "github.com/netsampler/goflow2/v2/utils/debug" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" @@ -128,6 +129,8 @@ func main() { log.Fatalf("producer %s does not exist", *Produce) } + // intercept panic and generate an error + flowProducer = debug.WrapPanicProducer(flowProducer) // wrap producer with Prometheus metrics flowProducer = metrics.WrapPromProducer(flowProducer) @@ -273,7 +276,12 @@ func main() { l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme) return } - decodeFunc = metrics.PromDecoderWrapper(p.DecodeFlow, listenAddrUrl.Scheme) + + decodeFunc = p.DecodeFlow + // intercept panic and generate error + decodeFunc = debug.PanicDecoderWrapper(decodeFunc) + // wrap decoder with Prometheus metrics + decodeFunc = metrics.PromDecoderWrapper(decodeFunc, listenAddrUrl.Scheme) pipes = append(pipes, p) // starts receivers @@ -293,6 +301,15 @@ func main() { l := l.WithError(err) if errors.Is(err, netflow.ErrorTemplateNotFound) { l.Warn("template error") + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + l = l.WithFields(log.Fields{ + "message": pErrMsg.Msg, + "stacktrace": string(pErrMsg.Stacktrace), + }) + } + l.Error("intercepted panic") } else if errors.Is(err, net.ErrClosed) { l.Info("closed receiver") } else { diff --git a/utils/debug/debug.go b/utils/debug/debug.go new file mode 100644 index 00000000..c6769b52 --- /dev/null +++ b/utils/debug/debug.go @@ -0,0 +1,23 @@ +package debug + +import ( + "fmt" +) + +var ( + PanicError = fmt.Errorf("panic") +) + +type PanicErrorMessage struct { + Msg interface{} + Inner string + Stacktrace []byte +} + +func (e *PanicErrorMessage) Error() string { + return fmt.Sprintf("%s", e.Inner) +} + +func (e *PanicErrorMessage) Unwrap() []error { + return []error{PanicError} +} diff --git a/utils/debug/decoder.go b/utils/debug/decoder.go new file mode 100644 index 00000000..3c61f42e --- /dev/null +++ b/utils/debug/decoder.go @@ -0,0 +1,22 @@ +package debug + +import ( + "runtime/debug" + + "github.com/netsampler/goflow2/v2/utils" +) + +func PanicDecoderWrapper(wrapped utils.DecoderFunc) utils.DecoderFunc { + return func(msg interface{}) (err error) { + + defer func() { + if pErr := recover(); pErr != nil { + + pErrC, _ := pErr.(string) + err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()} + } + }() + err = wrapped(msg) + return err + } +} diff --git a/utils/debug/producer.go b/utils/debug/producer.go new file mode 100644 index 00000000..c185178a --- /dev/null +++ b/utils/debug/producer.go @@ -0,0 +1,39 @@ +package debug + +import ( + "runtime/debug" + + "github.com/netsampler/goflow2/v2/producer" +) + +type PanicProducerWrapper struct { + wrapped producer.ProducerInterface +} + +func (p *PanicProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) (flowMessageSet []producer.ProducerMessage, err error) { + + defer func() { + if pErr := recover(); pErr != nil { + + pErrC, _ := pErr.(string) + err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()} + } + }() + + flowMessageSet, err = p.wrapped.Produce(msg, args) + return flowMessageSet, err +} + +func (p *PanicProducerWrapper) Close() { + p.wrapped.Close() +} + +func (p *PanicProducerWrapper) Commit(flowMessageSet []producer.ProducerMessage) { + p.wrapped.Commit(flowMessageSet) +} + +func WrapPanicProducer(wrapped producer.ProducerInterface) producer.ProducerInterface { + return &PanicProducerWrapper{ + wrapped: wrapped, + } +}