Skip to content

Commit

Permalink
Enriched logs of HTTP API and Scraper.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov committed Mar 7, 2024
1 parent 7461320 commit 1bd8b18
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
41 changes: 31 additions & 10 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
stderrs "errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (a *API) Start() error {
go func() {
err := a.srv.Serve(l)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
a.zap.Sugar().Fatalf("Failed to serve REST API at '%s': %v", a.srv.Addr, err)
a.zap.Sugar().Fatalf("[API] Failed to serve REST API at '%s': %v", a.srv.Addr, err)
}
}()
return nil
Expand All @@ -91,7 +92,7 @@ func (a *API) Shutdown() {
defer cancel()

if err := a.srv.Shutdown(ctx); err != nil {
a.zap.Error("Failed to shutdown REST API", zap.Error(err))
a.zap.Error("[API] Failed to shutdown REST API", zap.Error(err))
}
}

Expand All @@ -108,15 +109,15 @@ func (a *API) routes() chi.Router {
func (a *API) health(w http.ResponseWriter, r *http.Request) {
enabledNodes, enErr := a.nodesStorage.EnabledNodes()
if enErr != nil {
a.zap.Error("Healthcheck failed: failed get non specific nodes",
a.zap.Error("[API] Healthcheck failed: failed get non specific nodes",
zap.Error(enErr), zap.String("request-id", middleware.GetReqID(r.Context())),
)
http.Error(w, fmt.Sprintf("Failed to get non specific nodes: %v", enErr), http.StatusInternalServerError)
}
for _, node := range enabledNodes {
_, lhErr := a.eventsStorage.LatestHeight(node.URL)
if lhErr != nil && !errors.Is(lhErr, events.ErrNoFullStatement) {
a.zap.Error("Healthcheck failed: failed to get latest height for node", zap.String("node", node.URL),
a.zap.Error("[API] Healthcheck failed: failed to get latest height for node", zap.String("node", node.URL),
zap.Error(lhErr), zap.String("request-id", middleware.GetReqID(r.Context())),
)
http.Error(w, fmt.Sprintf("Failed to get non specific nodes: %v", lhErr), http.StatusInternalServerError)
Expand All @@ -131,10 +132,29 @@ type nodeShortStatement struct {
Height int `json:"height,omitempty"`
}

type statementLogWrapper struct {
nodeShortStatement
}

func (s statementLogWrapper) String() string {
data, err := json.Marshal(s)
if err != nil {
type jsonErr struct {
Error string `json:"error"`
}
errData, mErr := json.Marshal(jsonErr{Error: err.Error()})
if mErr != nil { // must never happen
panic(stderrs.Join(err, mErr))
}
return string(errData)
}
return string(data)
}

func (a *API) specificNodesHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(io.LimitReader(r.Body, megabyte))
if err != nil {
a.zap.Error("Failed to read request body", zap.Error(err))
a.zap.Error("[API] Failed to read request body", zap.Error(err))
http.Error(w, fmt.Sprintf("Failed to read body: %v", err), http.StatusInternalServerError)
return
}
Expand All @@ -145,36 +165,37 @@ func (a *API) specificNodesHandler(w http.ResponseWriter, r *http.Request) {
statehash := &proto.StateHash{}
err = json.NewDecoder(stateHashReader).Decode(statehash)
if err != nil {
a.zap.Error("Failed to decode statehash", zap.Error(err))
a.zap.Error("[API] Failed to decode statehash", zap.Error(err))
http.Error(w, fmt.Sprintf("Failed to decode statehash: %v", err), http.StatusInternalServerError)
return
}

statement := nodeShortStatement{}
err = json.NewDecoder(statementReader).Decode(&statement)
if err != nil {
a.zap.Error("Failed to decode a specific nodes statement", zap.Error(err))
a.zap.Error("[API] Failed to decode a specific nodes statement", zap.Error(err))
http.Error(w, fmt.Sprintf("Failed to decode statements: %v", err), http.StatusInternalServerError)
return
}
escapedNodeName := cleanCRLF(r.Header.Get("node-name"))
updatedURL, err := entities.CheckAndUpdateURL(escapedNodeName)
if err != nil {
a.zap.Error("Failed to check and update node's url", zap.Error(err))
a.zap.Error("[API] Failed to check and update node's url", zap.Error(err))
http.Error(w, fmt.Sprintf("Failed to check node name: %v", err), http.StatusInternalServerError)
return
}
statement.Node = updatedURL
a.zap.Sugar().Debugf("[API] Received statement %s", fmt.Stringer(statementLogWrapper{statement}))

enabledSpecificNodes, err := a.nodesStorage.EnabledSpecificNodes()
if err != nil {
a.zap.Error("Failed to fetch specific nodes from storage", zap.Error(err))
a.zap.Error("[API] Failed to fetch specific nodes from storage", zap.Error(err))
http.Error(w, fmt.Sprintf("Failed to fetch specific nodes from storage: %v", err), http.StatusInternalServerError)
return
}

if !specificNodeFoundInStorage(enabledSpecificNodes, statement) {
a.zap.Info("Received a statements from the private node but it's not being monitored by the nodemon",
a.zap.Info("[API] Received a statements from the private node but it's not being monitored by the nodemon",
zap.String("node", statement.Node),
)
w.WriteHeader(http.StatusForbidden)
Expand Down
27 changes: 18 additions & 9 deletions pkg/scraping/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,20 @@ func (s *Scraper) poll(ctx context.Context, notifications chan<- entities.NodesG

enabledNodes, storageErr := s.ns.EnabledNodes()
if storageErr != nil {
s.zap.Error("Failed to get nodes from storage", zap.Error(storageErr))
s.zap.Error("[SCRAPER] Failed to get nodes from storage", zap.Error(storageErr))
}

ec := s.queryNodes(ctx, enabledNodes, now)
cnt := 0
for e := range ec {
if err := s.es.PutEvent(e); err != nil {
s.zap.Sugar().Errorf("Failed to collect event '%T' from node %s, statement=%+v: %v",
s.zap.Sugar().Errorf("[SCRAPER] Failed to collect event '%T' from node %s, statement=%+v: %v",
e, e.Node(), e.Statement(), err)
} else {
cnt++
}
}
s.zap.Sugar().Infof("Polling of %d nodes completed with %d events saved", len(enabledNodes), cnt)
s.zap.Sugar().Infof("[SCRAPER] Polling of %d nodes completed with %d events saved", len(enabledNodes), cnt)

urls := make([]string, len(enabledNodes))
for i := range enabledNodes {
Expand All @@ -92,7 +92,9 @@ func (s *Scraper) queryNodes(ctx context.Context, nodes []entities.Node, now int
go func() {
defer wg.Done()
event := s.queryNode(ctx, nodeURL, now)
s.zap.Sugar().Infof("Collected event (%T) at height %d for node %s", event, event.Height(), nodeURL)
s.zap.Sugar().Infof("[SCRAPER] Collected event (%T) at height %d for node %s",
event, event.Height(), nodeURL,
)
ec <- event
}()
}
Expand All @@ -106,31 +108,38 @@ func (s *Scraper) queryNode(ctx context.Context, url string, ts int64) entities.
node := newNodeClient(url, s.timeout, s.zap)
v, err := node.version(ctx)
if err != nil {
s.zap.Sugar().Warnf("Failed to get version for node %s: %v", url, err)
s.zap.Sugar().Warnf("[SCRAPER] Failed to get version for node %s: %v", url, err)
return entities.NewUnreachableEvent(url, ts)
}
s.zap.Sugar().Debugf("[SCRAPER] Node %s has version %s", url, v)

h, err := node.height(ctx)
if err != nil {
s.zap.Sugar().Warnf("Failed to get height for node %s: %v", url, err)
s.zap.Sugar().Warnf("[SCRAPER] Failed to get height for node %s: %v", url, err)
return entities.NewVersionEvent(url, ts, v) // we know version, sending what we know about node
}
s.zap.Sugar().Debugf("[SCRAPER] Node %s has height %d", url, h)

const minValidHeight = 2
if h < minValidHeight {
s.zap.Sugar().Warnf("Node %s has invalid height %d", url, h)
s.zap.Sugar().Warnf("[SCRAPER] Node %s has invalid height %d", url, h)
return entities.NewInvalidHeightEvent(url, ts, v, h)
}
h-- // Go to previous height to request base target and state hash

bs, err := node.baseTarget(ctx, h)
if err != nil {
s.zap.Sugar().Warnf("Failed to get base target for node %s: %v", url, err)
s.zap.Sugar().Warnf("[SCRAPER] Failed to get base target for node %s: %v", url, err)
return entities.NewHeightEvent(url, ts, v, h) // we know about version and height, sending it
}
s.zap.Sugar().Debugf("[SCRAPER] Node %s has base target %d at height %d", url, bs, h)

sh, err := node.stateHash(ctx, h)
if err != nil {
s.zap.Sugar().Warnf("Failed to get state hash for node %s: %v", url, err)
s.zap.Sugar().Warnf("[SCRAPER] Failed to get state hash for node %s: %v", url, err)
return entities.NewBaseTargetEvent(url, ts, v, h, bs) // we know version, height and base target, sending it
}
s.zap.Sugar().Debugf("[SCRAPER] Node %s has state hash %s at height %d", url, sh.SumHash.Hex(), h)

return entities.NewStateHashEvent(url, ts, v, h, sh, bs) // sending full info about node
}

0 comments on commit 1bd8b18

Please sign in to comment.