Skip to content

Commit

Permalink
new processors
Browse files Browse the repository at this point in the history
  • Loading branch information
kdudkov committed Oct 17, 2023
1 parent 3120504 commit 94610f0
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 136 deletions.
80 changes: 6 additions & 74 deletions cmd/goatak_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/pem"
"flag"
"fmt"
"google.golang.org/protobuf/proto"
"io/ioutil"
"log"
"net"
Expand Down Expand Up @@ -38,8 +37,6 @@ var (
lastSeenOfflineTimeout = time.Minute * 5
)

type EventProcessor func(msg *cot.CotMessage)

type AppConfig struct {
udpAddr string
tcpAddr string
Expand Down Expand Up @@ -86,7 +83,7 @@ type App struct {
ctx context.Context
uid string
ch chan *cot.CotMessage
eventProcessors map[string]EventProcessor
eventProcessors map[string]*EventProcessor
}

func NewApp(config *AppConfig, logger *zap.SugaredLogger) *App {
Expand All @@ -100,7 +97,7 @@ func NewApp(config *AppConfig, logger *zap.SugaredLogger) *App {
items: repository.NewItemsMemoryRepo(),
feeds: repository.NewFeedsFileRepo(logger.Named("feedsRepo"), filepath.Join(config.dataDir, "feeds")),
uid: uuid.New().String(),
eventProcessors: make(map[string]EventProcessor),
eventProcessors: make(map[string]*EventProcessor),
}

return app
Expand Down Expand Up @@ -174,12 +171,6 @@ func (app *App) NewCotMessage(msg *cot.CotMessage) {
app.ch <- msg
}

func (app *App) AddEventProcessor(e EventProcessor, mask ...string) {
for _, s := range mask {
app.eventProcessors[s] = e
}
}

func (app *App) AddClientHandler(ch client.ClientHandler) {
app.handlers.Store(ch.GetName(), ch)
}
Expand Down Expand Up @@ -316,29 +307,10 @@ func (app *App) getTlsConfig() *tls.Config {

func (app *App) MessageProcessor() {
for msg := range app.ch {
if app.config.logging {
if err := app.logToFile(msg); err != nil {
app.Logger.Warnf("error logging message: %s", err.Error())
}
}
if msg.TakMessage.CotEvent == nil {
continue
}

if c := app.items.Get(msg.GetUid()); c != nil {
c.Update(nil)
}

if _, processor := app.GetProcessor(msg.GetType()); processor != nil {
processor(msg)
}

if !strings.HasPrefix(msg.GetType(), "a-") {
name, exact := cot.GetMsgType(msg.GetType())
if exact {
app.Logger.Debugf("%s %s", msg.GetType(), name)
} else {
app.Logger.Infof("%s %s (extended)", msg.GetType(), name)
for name, prc := range app.eventProcessors {
if cot.MatchAnyPattern(msg.GetType(), prc.include...) {
app.Logger.Debugf("msg is processed by %s", name)
prc.cb(msg)
}
}

Expand Down Expand Up @@ -415,16 +387,6 @@ func (app *App) SendToCallsign(callsign string, msg *cotproto.TakMessage) {
})
}

func (app *App) logMessage(c *model.ChatMessage) {
fd, err := os.OpenFile("msg.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
app.Logger.Errorf("can't write to message log: %s", err.Error())
return
}
defer fd.Close()
fmt.Fprintf(fd, "%s %s (%s) -> %s (%s) \"%s\"\n", c.Time, c.From, c.FromUid, c.Chatroom, c.ToUid, c.Text)
}

func loadPem(name string) ([]*x509.Certificate, error) {
if name == "" {
return nil, nil
Expand Down Expand Up @@ -494,36 +456,6 @@ func processCerts(conf *AppConfig) error {
return nil
}

func (app *App) logToFile(msg *cot.CotMessage) error {
path := filepath.Join(app.config.dataDir, "log")

if err := os.MkdirAll(path, 0777); err != nil {
return err
}

// don't save pings
if msg.GetType() == "t-x-c-t" || msg.GetType() == "t-x-c-t-r" {
return nil
}

fname := filepath.Join(path, time.Now().Format("2006-01-02.tak"))

f, err := os.OpenFile(fname, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()

d, err := proto.Marshal(msg.TakMessage)
if err != nil {
return err
}
l := uint32(len(d))
_, _ = f.Write([]byte{byte(l % 256), byte(l / 256)})
_, _ = f.Write(d)
return nil
}

func main() {
fmt.Printf("version %s %s\n", gitRevision, gitBranch)
var debug = flag.Bool("debug", false, "debug node")
Expand Down
100 changes: 66 additions & 34 deletions cmd/goatak_server/processors.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
package main

import (
"encoding/json"
"fmt"
"github.com/kdudkov/goatak/pkg/cot"
"github.com/kdudkov/goatak/pkg/model"
"github.com/spf13/viper"
"google.golang.org/protobuf/proto"
"os"
"path/filepath"
"strings"
"time"
)

func (app *App) InitMessageProcessors() {
app.eventProcessors["t-x-d-d"] = app.removeItemProcessor
// chat
app.eventProcessors["b-t-f"] = app.chatProcessor
app.eventProcessors["a-"] = app.ProcessItem
app.eventProcessors["b-"] = app.ProcessItem
type EventProcessor struct {
include []string
cb func(msg *cot.CotMessage)
}

func (app *App) GetProcessor(t string) (string, EventProcessor) {
var found string
for k, v := range app.eventProcessors {
if k == t {
return k, v
}
if cot.MatchPattern(t, k) && len(k) > len(found) {
found = k
}
}
func (app *App) AddEventProcessor(name string, cb func(msg *cot.CotMessage), masks ...string) {
app.eventProcessors[name] = &EventProcessor{cb: cb, include: masks}
}

if found != "" {
return found, app.eventProcessors[found]
func (app *App) InitMessageProcessors() {
app.AddEventProcessor("remove", app.removeItemProcessor, "t-x-d-d")
app.AddEventProcessor("chat", app.chatProcessor, "b-t-f-")
app.AddEventProcessor("items", app.saveItemProcessor, "a-", "b-")
app.AddEventProcessor("logger", app.loggerProcessor, "-")
if app.config.logging {
app.AddEventProcessor("file_logger", app.fileLoggerProcessor, "-")
}

return "", nil
}

func (app *App) justLogProcessor(msg *cot.CotMessage) {
app.Logger.Debugf("%s %s", msg.GetType(), msg.GetUid())
func (app *App) loggerProcessor(msg *cot.CotMessage) {
if !strings.HasPrefix(msg.GetType(), "a-") {
name, exact := cot.GetMsgType(msg.GetType())
if exact {
app.Logger.Debugf("%s %s", msg.GetType(), name)
} else {
app.Logger.Infof("%s %s (extended)", msg.GetType(), name)
}
}
}

func (app *App) removeItemProcessor(msg *cot.CotMessage) {
Expand Down Expand Up @@ -72,36 +77,63 @@ func (app *App) chatProcessor(msg *cot.CotMessage) {
}
app.Logger.Infof("Chat %s", c.String())
app.messages = append(app.messages, c)
app.logMessage(c)
if err := logChatMessage(c); err != nil {
app.Logger.Warnf("error logging chat: %s", err.Error())
}
}

func (app *App) ProcessItem(msg *cot.CotMessage) {
func (app *App) saveItemProcessor(msg *cot.CotMessage) {
cl := model.GetClass(msg)
if c := app.items.Get(msg.GetUid()); c != nil {
app.Logger.Debugf("update %s %s (%s) %s", cl, msg.GetUid(), msg.GetCallsign(), msg.GetType())
c.Update(msg)
} else {
app.Logger.Infof("new %s %s (%s) %s", cl, msg.GetUid(), msg.GetCallsign(), msg.GetType())
if msg.GetCallsign() == "" {
app.Logger.Infof("%s", msg.TakMessage.GetCotEvent().GetDetail().GetXmlDetail())
_ = logToFile(msg)
}
app.items.Store(model.FromMsg(msg))
}
}

func logToFile(msg *cot.CotMessage) error {
f, err := os.OpenFile(msg.GetType()+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
func (app *App) fileLoggerProcessor(msg *cot.CotMessage) {
if cot.MatchAnyPattern(msg.GetType(), "t-x-c-t", "t-x-c-t-r") {
return
}
if cot.MatchAnyPattern(msg.GetType(), viper.GetStringSlice("log_exclude")...) {
return
}
if err := logMessage(msg, filepath.Join(app.config.dataDir, "log")); err != nil {
app.Logger.Warnf("error logging message: %s", err.Error())
}
}

func logMessage(msg *cot.CotMessage, dir string) error {
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}

fname := filepath.Join(dir, time.Now().Format("2006-01-02.tak"))

f, err := os.OpenFile(fname, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()

b, err := json.Marshal(msg.TakMessage)
d, err := proto.Marshal(msg.TakMessage)
if err != nil {
return err
}
_, _ = f.WriteString(string(b))
_, _ = f.WriteString("\n")
l := uint32(len(d))
_, _ = f.Write([]byte{byte(l % 256), byte(l / 256)})
_, _ = f.Write(d)
return nil
}

func logChatMessage(c *model.ChatMessage) error {
fd, err := os.OpenFile("msg.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return nil
}
defer fd.Close()
_, err = fmt.Fprintf(fd, "%s %s (%s) -> %s (%s) \"%s\"\n", c.Time, c.From, c.FromUid, c.Chatroom, c.ToUid, c.Text)
return err
}
28 changes: 0 additions & 28 deletions cmd/goatak_server/processors_test.go

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/cot/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package cot
import "strings"

func MatchPattern(a, pattern string) bool {
if pattern == "-" {
return true
}

if strings.HasPrefix(a, pattern) && strings.HasSuffix(pattern, "-") {
return true
}
Expand All @@ -26,3 +30,12 @@ func MatchPattern(a, pattern string) bool {
return len(at) == len(pt)
}
}

func MatchAnyPattern(a string, patterns ...string) bool {
for _, p := range patterns {
if MatchPattern(a, p) {
return true
}
}
return false
}

0 comments on commit 94610f0

Please sign in to comment.