From 94610f063add0f4db7740bdc29aa41d4f313d583 Mon Sep 17 00:00:00 2001 From: Konstantin Dudkov Date: Tue, 17 Oct 2023 13:08:57 +0300 Subject: [PATCH] new processors --- cmd/goatak_server/main.go | 80 ++------------------- cmd/goatak_server/processors.go | 100 ++++++++++++++++++--------- cmd/goatak_server/processors_test.go | 28 -------- pkg/cot/filter.go | 13 ++++ 4 files changed, 85 insertions(+), 136 deletions(-) delete mode 100644 cmd/goatak_server/processors_test.go diff --git a/cmd/goatak_server/main.go b/cmd/goatak_server/main.go index 3397998..aade651 100644 --- a/cmd/goatak_server/main.go +++ b/cmd/goatak_server/main.go @@ -8,7 +8,6 @@ import ( "encoding/pem" "flag" "fmt" - "google.golang.org/protobuf/proto" "io/ioutil" "log" "net" @@ -38,8 +37,6 @@ var ( lastSeenOfflineTimeout = time.Minute * 5 ) -type EventProcessor func(msg *cot.CotMessage) - type AppConfig struct { udpAddr string tcpAddr string @@ -86,7 +83,7 @@ type App struct { ctx context.Context uid string ch chan *cot.CotMessage - eventProcessors map[string]EventProcessor + eventProcessors map[string]*EventProcessor } func NewApp(config *AppConfig, logger *zap.SugaredLogger) *App { @@ -100,7 +97,7 @@ func NewApp(config *AppConfig, logger *zap.SugaredLogger) *App { items: repository.NewItemsMemoryRepo(), feeds: repository.NewFeedsFileRepo(logger.Named("feedsRepo"), filepath.Join(config.dataDir, "feeds")), uid: uuid.New().String(), - eventProcessors: make(map[string]EventProcessor), + eventProcessors: make(map[string]*EventProcessor), } return app @@ -174,12 +171,6 @@ func (app *App) NewCotMessage(msg *cot.CotMessage) { app.ch <- msg } -func (app *App) AddEventProcessor(e EventProcessor, mask ...string) { - for _, s := range mask { - app.eventProcessors[s] = e - } -} - func (app *App) AddClientHandler(ch client.ClientHandler) { app.handlers.Store(ch.GetName(), ch) } @@ -316,29 +307,10 @@ func (app *App) getTlsConfig() *tls.Config { func (app *App) MessageProcessor() { for msg := range app.ch { - if app.config.logging { - if err := app.logToFile(msg); err != nil { - app.Logger.Warnf("error logging message: %s", err.Error()) - } - } - if msg.TakMessage.CotEvent == nil { - continue - } - - if c := app.items.Get(msg.GetUid()); c != nil { - c.Update(nil) - } - - if _, processor := app.GetProcessor(msg.GetType()); processor != nil { - processor(msg) - } - - if !strings.HasPrefix(msg.GetType(), "a-") { - name, exact := cot.GetMsgType(msg.GetType()) - if exact { - app.Logger.Debugf("%s %s", msg.GetType(), name) - } else { - app.Logger.Infof("%s %s (extended)", msg.GetType(), name) + for name, prc := range app.eventProcessors { + if cot.MatchAnyPattern(msg.GetType(), prc.include...) { + app.Logger.Debugf("msg is processed by %s", name) + prc.cb(msg) } } @@ -415,16 +387,6 @@ func (app *App) SendToCallsign(callsign string, msg *cotproto.TakMessage) { }) } -func (app *App) logMessage(c *model.ChatMessage) { - fd, err := os.OpenFile("msg.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) - if err != nil { - app.Logger.Errorf("can't write to message log: %s", err.Error()) - return - } - defer fd.Close() - fmt.Fprintf(fd, "%s %s (%s) -> %s (%s) \"%s\"\n", c.Time, c.From, c.FromUid, c.Chatroom, c.ToUid, c.Text) -} - func loadPem(name string) ([]*x509.Certificate, error) { if name == "" { return nil, nil @@ -494,36 +456,6 @@ func processCerts(conf *AppConfig) error { return nil } -func (app *App) logToFile(msg *cot.CotMessage) error { - path := filepath.Join(app.config.dataDir, "log") - - if err := os.MkdirAll(path, 0777); err != nil { - return err - } - - // don't save pings - if msg.GetType() == "t-x-c-t" || msg.GetType() == "t-x-c-t-r" { - return nil - } - - fname := filepath.Join(path, time.Now().Format("2006-01-02.tak")) - - f, err := os.OpenFile(fname, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) - if err != nil { - return err - } - defer f.Close() - - d, err := proto.Marshal(msg.TakMessage) - if err != nil { - return err - } - l := uint32(len(d)) - _, _ = f.Write([]byte{byte(l % 256), byte(l / 256)}) - _, _ = f.Write(d) - return nil -} - func main() { fmt.Printf("version %s %s\n", gitRevision, gitBranch) var debug = flag.Bool("debug", false, "debug node") diff --git a/cmd/goatak_server/processors.go b/cmd/goatak_server/processors.go index d8c01c8..7ac0c3b 100644 --- a/cmd/goatak_server/processors.go +++ b/cmd/goatak_server/processors.go @@ -1,40 +1,45 @@ package main import ( - "encoding/json" + "fmt" "github.com/kdudkov/goatak/pkg/cot" "github.com/kdudkov/goatak/pkg/model" + "github.com/spf13/viper" + "google.golang.org/protobuf/proto" "os" + "path/filepath" + "strings" + "time" ) -func (app *App) InitMessageProcessors() { - app.eventProcessors["t-x-d-d"] = app.removeItemProcessor - // chat - app.eventProcessors["b-t-f"] = app.chatProcessor - app.eventProcessors["a-"] = app.ProcessItem - app.eventProcessors["b-"] = app.ProcessItem +type EventProcessor struct { + include []string + cb func(msg *cot.CotMessage) } -func (app *App) GetProcessor(t string) (string, EventProcessor) { - var found string - for k, v := range app.eventProcessors { - if k == t { - return k, v - } - if cot.MatchPattern(t, k) && len(k) > len(found) { - found = k - } - } +func (app *App) AddEventProcessor(name string, cb func(msg *cot.CotMessage), masks ...string) { + app.eventProcessors[name] = &EventProcessor{cb: cb, include: masks} +} - if found != "" { - return found, app.eventProcessors[found] +func (app *App) InitMessageProcessors() { + app.AddEventProcessor("remove", app.removeItemProcessor, "t-x-d-d") + app.AddEventProcessor("chat", app.chatProcessor, "b-t-f-") + app.AddEventProcessor("items", app.saveItemProcessor, "a-", "b-") + app.AddEventProcessor("logger", app.loggerProcessor, "-") + if app.config.logging { + app.AddEventProcessor("file_logger", app.fileLoggerProcessor, "-") } - - return "", nil } -func (app *App) justLogProcessor(msg *cot.CotMessage) { - app.Logger.Debugf("%s %s", msg.GetType(), msg.GetUid()) +func (app *App) loggerProcessor(msg *cot.CotMessage) { + if !strings.HasPrefix(msg.GetType(), "a-") { + name, exact := cot.GetMsgType(msg.GetType()) + if exact { + app.Logger.Debugf("%s %s", msg.GetType(), name) + } else { + app.Logger.Infof("%s %s (extended)", msg.GetType(), name) + } + } } func (app *App) removeItemProcessor(msg *cot.CotMessage) { @@ -72,36 +77,63 @@ func (app *App) chatProcessor(msg *cot.CotMessage) { } app.Logger.Infof("Chat %s", c.String()) app.messages = append(app.messages, c) - app.logMessage(c) + if err := logChatMessage(c); err != nil { + app.Logger.Warnf("error logging chat: %s", err.Error()) + } } -func (app *App) ProcessItem(msg *cot.CotMessage) { +func (app *App) saveItemProcessor(msg *cot.CotMessage) { cl := model.GetClass(msg) if c := app.items.Get(msg.GetUid()); c != nil { app.Logger.Debugf("update %s %s (%s) %s", cl, msg.GetUid(), msg.GetCallsign(), msg.GetType()) c.Update(msg) } else { app.Logger.Infof("new %s %s (%s) %s", cl, msg.GetUid(), msg.GetCallsign(), msg.GetType()) - if msg.GetCallsign() == "" { - app.Logger.Infof("%s", msg.TakMessage.GetCotEvent().GetDetail().GetXmlDetail()) - _ = logToFile(msg) - } app.items.Store(model.FromMsg(msg)) } } -func logToFile(msg *cot.CotMessage) error { - f, err := os.OpenFile(msg.GetType()+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) +func (app *App) fileLoggerProcessor(msg *cot.CotMessage) { + if cot.MatchAnyPattern(msg.GetType(), "t-x-c-t", "t-x-c-t-r") { + return + } + if cot.MatchAnyPattern(msg.GetType(), viper.GetStringSlice("log_exclude")...) { + return + } + if err := logMessage(msg, filepath.Join(app.config.dataDir, "log")); err != nil { + app.Logger.Warnf("error logging message: %s", err.Error()) + } +} + +func logMessage(msg *cot.CotMessage, dir string) error { + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + fname := filepath.Join(dir, time.Now().Format("2006-01-02.tak")) + + f, err := os.OpenFile(fname, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) if err != nil { return err } defer f.Close() - b, err := json.Marshal(msg.TakMessage) + d, err := proto.Marshal(msg.TakMessage) if err != nil { return err } - _, _ = f.WriteString(string(b)) - _, _ = f.WriteString("\n") + l := uint32(len(d)) + _, _ = f.Write([]byte{byte(l % 256), byte(l / 256)}) + _, _ = f.Write(d) return nil } + +func logChatMessage(c *model.ChatMessage) error { + fd, err := os.OpenFile("msg.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) + if err != nil { + return nil + } + defer fd.Close() + _, err = fmt.Fprintf(fd, "%s %s (%s) -> %s (%s) \"%s\"\n", c.Time, c.From, c.FromUid, c.Chatroom, c.ToUid, c.Text) + return err +} diff --git a/cmd/goatak_server/processors_test.go b/cmd/goatak_server/processors_test.go deleted file mode 100644 index 54ac80b..0000000 --- a/cmd/goatak_server/processors_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "go.uber.org/zap" - "testing" -) - -func TestGetProcessor(t *testing.T) { - logger, _ := zap.NewDevelopment() - - app := NewApp(&AppConfig{}, logger.Sugar()) - app.InitMessageProcessors() - - data := map[string]string{ - "a-b-c-d": "a-", - "b-t-f": "b-t-f", - "b-t-f-a": "b-t-f-", - "b-t-f-a-b": "b-t-f-", - "b-t-b-a": "b-", - } - - for k, v := range data { - p, _ := app.GetProcessor(k) - if p != v { - t.Errorf("got %s, must be %s", p, v) - } - } -} diff --git a/pkg/cot/filter.go b/pkg/cot/filter.go index cc28002..34c4da3 100644 --- a/pkg/cot/filter.go +++ b/pkg/cot/filter.go @@ -3,6 +3,10 @@ package cot import "strings" func MatchPattern(a, pattern string) bool { + if pattern == "-" { + return true + } + if strings.HasPrefix(a, pattern) && strings.HasSuffix(pattern, "-") { return true } @@ -26,3 +30,12 @@ func MatchPattern(a, pattern string) bool { return len(at) == len(pt) } } + +func MatchAnyPattern(a string, patterns ...string) bool { + for _, p := range patterns { + if MatchPattern(a, p) { + return true + } + } + return false +}