diff --git a/Dockerfile b/Dockerfile index fbd6dba..ca09acf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,6 +45,10 @@ ENV DB_CONN_MAX_LIFE_TIME '' ENV DB_MAX_IDLE_CONNS '' ENV DB_MAX_OPEN_CONNS '' +ENV BRIDGE_STATS_NODE_NAME '' +ENV BRIDGE_STATS_URL '' +ENV BRIDGE_STATS_SECRET '' + COPY --from=builder /go/bin/bridge /usr/local/bin/bridge COPY --from=builder /opt/bridge/config/ ./ COPY --from=builder /opt/bridge/docker/entrypoint.sh ./ diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 304a13d..3096deb 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -3,6 +3,8 @@ package main import ( "encoding/json" "fmt" + "github.com/axieinfinity/bridge-v2/stats" + "gorm.io/gorm" "io/ioutil" "os" "os/signal" @@ -85,6 +87,10 @@ const ( RoninNetwork = "Ronin" fromBlock = "FROM_BLOCK" + + bridgeStatsNodeName = "BRIDGE_STATS_NODE_NAME" + bridgeStatsUrl = "BRIDGE_STATS_URL" + bridgeStatsSecret = "BRIDGE_STATS_SECRET" ) var ( @@ -401,6 +407,31 @@ func createPgDb(cfg *bridgeCore.Config) { } } +func setupStats(cfg *bridgeCore.Config, db *gorm.DB) { + // check if bridge stats is enabled + if os.Getenv(bridgeStatsNodeName) != "" && os.Getenv(bridgeStatsUrl) != "" { + // NOTE: only get chainId, operator from ronin + var ( + chainId, operator string + node = os.Getenv(bridgeStatsNodeName) + host = os.Getenv(bridgeStatsUrl) + pass = os.Getenv(bridgeStatsSecret) + ) + if _, ok := cfg.Listeners[RoninNetwork]; ok { + chainId = cfg.Listeners[RoninNetwork].ChainId + if cfg.Listeners[RoninNetwork].Secret != nil && cfg.Listeners[RoninNetwork].Secret.BridgeOperator != nil { + signMethod, err := bridgeCoreUtils.NewSignMethod(cfg.Listeners[RoninNetwork].Secret.BridgeOperator) + if err != nil { + panic(err) + } + operator = signMethod.GetAddress().Hex() + } + } + stats.NewService(node, chainId, operator, host, pass, db) + go stats.BridgeStats.Start() + } +} + func bridge(ctx *cli.Context) { cfg := prepare(ctx) // init db @@ -408,6 +439,8 @@ func bridge(ctx *cli.Context) { if err != nil { panic(err) } + // setup stats + setupStats(cfg, db) // start migration if err = migration.Migrate(db, cfg); err != nil { panic(err) @@ -427,6 +460,9 @@ func bridge(ctx *cli.Context) { select { case <-sigc: controller.Close() + if stats.BridgeStats != nil { + stats.BridgeStats.Stop() + } } } diff --git a/listener/ethereum.go b/listener/ethereum.go index ab56eb3..e54b9b6 100644 --- a/listener/ethereum.go +++ b/listener/ethereum.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/axieinfinity/bridge-v2/stats" + bridgeCore "github.com/axieinfinity/bridge-core" "github.com/axieinfinity/bridge-core/metrics" bridgeCoreModels "github.com/axieinfinity/bridge-core/models" @@ -290,7 +292,9 @@ func (e *EthereumListener) SaveCurrentBlockToDB() error { if err := e.store.GetProcessedBlockStore().Save(hexutil.EncodeBig(chainId), int64(e.GetCurrentBlock().GetHeight())); err != nil { return err } - + if stats.BridgeStats != nil { + stats.BridgeStats.SendProcessedBlock(e.GetName(), e.GetCurrentBlock().GetHeight()) + } metrics.Pusher.IncrCounter(fmt.Sprintf(metrics.ListenerProcessedBlockMetric, e.GetName()), 1) return nil } diff --git a/listener/types.go b/listener/types.go index a61c978..d87fbe5 100644 --- a/listener/types.go +++ b/listener/types.go @@ -2,6 +2,7 @@ package listener import ( "context" + "github.com/axieinfinity/bridge-v2/stats" "math/big" "time" @@ -216,6 +217,12 @@ func NewEthListenJob(jobType int, listener bridgeCore.Listener, subscriptionName } } +func (e *EthListenJob) Process() ([]byte, error) { + data, err := e.BaseJob.Process() + stats.SendErrorToStats(e.GetListener(), err) + return data, err +} + type EthCallbackJob struct { *bridgeCore.BaseJob result interface{} @@ -248,10 +255,12 @@ func (e *EthCallbackJob) Process() ([]byte, error) { log.Info("[EthCallbackJob] Start Process", "method", e.method, "jobId", e.GetID()) val, err := e.Utils().Invoke(e.GetListener(), e.method, e.FromChainID(), e.GetTransaction(), e.GetData()) if err != nil { + stats.SendErrorToStats(e.GetListener(), err) return nil, err } invokeErr, ok := val.Interface().(error) if ok { + stats.SendErrorToStats(e.GetListener(), invokeErr) return nil, invokeErr } return nil, nil diff --git a/stats/bridge_stats.go b/stats/bridge_stats.go new file mode 100644 index 0000000..972681b --- /dev/null +++ b/stats/bridge_stats.go @@ -0,0 +1,356 @@ +package stats + +import ( + "encoding/json" + "errors" + "net/http" + "strings" + "sync" + "time" + + bridgeCore "github.com/axieinfinity/bridge-core" + "github.com/axieinfinity/bridge-v2/stores" + "github.com/ethereum/go-ethereum/log" + "github.com/gorilla/websocket" + "gorm.io/gorm" +) + +const bridgeVersion = "v2" + +type NodeInfo struct { + Organization string `json:"organization,omitempty" mapstructure:"organization"` + Coinbase string `json:"coinbase" mapstructure:"coinbase"` + Name string `json:"name" mapstructure:"name"` + Node string `json:"node" mapstructure:"node"` + Port int `json:"port" mapstructure:"port"` + Network string `json:"net" mapstructure:"net"` + Protocol string `json:"protocol" mapstructure:"protocol"` + API string `json:"api" mapstructure:"api"` + Os string `json:"os" mapstructure:"os"` + OsVer string `json:"os_v" mapstructure:"os_v"` + Client string `json:"client" mapstructure:"client"` + History bool `json:"canUpdateHistory" mapstructure:"canUpdateHistory"` + Operator string `json:"operator" mapstructure:"operator"` + BridgeVersion string `json:"bridgeVersion" mapstructure:"bridgeVersion"` +} + +var BridgeStats *Service + +func SendErrorToStats(listener bridgeCore.Listener, err error) { + if err != nil && BridgeStats != nil { + BridgeStats.SendError(listener.GetName(), err.Error()) + } +} + +// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the +// websocket. +// +// From Gorilla websocket docs: +// +// Connections support one concurrent reader and one concurrent writer. +// Applications are responsible for ensuring that no more than one goroutine calls the write methods +// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel +// concurrently and that no more than one goroutine calls the read methods +// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler +// concurrently. +// The Close and WriteControl methods can be called concurrently with all other methods. +type connWrapper struct { + conn *websocket.Conn + + rlock sync.Mutex + wlock sync.Mutex +} + +func newConnectionWrapper(conn *websocket.Conn) *connWrapper { + return &connWrapper{conn: conn} +} + +// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) WriteJSON(v interface{}) error { + w.wlock.Lock() + defer w.wlock.Unlock() + + return w.conn.WriteJSON(v) +} + +// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) ReadJSON(v interface{}) error { + w.rlock.Lock() + defer w.rlock.Unlock() + + return w.conn.ReadJSON(v) +} + +// Close wraps corresponding method on the websocket but is safe for concurrent calling +func (w *connWrapper) Close() error { + // The Close and WriteControl methods can be called concurrently with all other methods, + // so the mutex is not used here + return w.conn.Close() +} + +type errorMessage struct { + Listener string `json:"listener"` + Err string `json:"error"` +} + +type processedBlockMessage struct { + Listener string `json:"listener"` + ProcessedBlock uint64 `json:"processedBlock"` +} + +type BridgeInfo struct { + Node string `json:"node"` + Operator string `json:"bridgeOperatorAddress"` + Version string `json:"version"` + LastError map[string]string `json:"lastError"` + ProcessedBlock map[string]uint64 `json:"processedBlock"` + PendingTasks int `json:"pendingTasks"` + FailedTasks int `json:"failedTasks"` +} + +type authMsg struct { + ID string `json:"id"` + Info *NodeInfo `json:"info"` + Secret string `json:"secret"` +} + +type Service struct { + chainId string + node string + operator string + secret string + host string + lastError map[string]string + processedBlock map[string]uint64 + errCh chan errorMessage + processedBlockCh chan processedBlockMessage + quitCh chan struct{} + store stores.TaskStore +} + +func NewService(node, chainId, operator, host, secret string, db *gorm.DB) { + BridgeStats = &Service{ + node: node, + chainId: chainId, + operator: operator, + secret: secret, + host: host, + store: stores.NewTaskStore(db), + lastError: make(map[string]string), + processedBlock: make(map[string]uint64), + errCh: make(chan errorMessage, 1), + processedBlockCh: make(chan processedBlockMessage, 1), + quitCh: make(chan struct{}, 1), + } +} + +// Start loop keeps trying to connect to ronin stats server, reporting bridge stats. +func (s *Service) Start() { + errTimer := time.NewTimer(0) + defer errTimer.Stop() + // Loop reporting until termination + for { + select { + case <-s.quitCh: + return + case <-errTimer.C: + // Establish a websocket connection to the server on any supported URL + var ( + conn *connWrapper + err error + ) + dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} + header := make(http.Header) + header.Set("origin", "http://localhost") + log.Info("[Bridge stats] Dial to host ", "host", s.host) + c, _, e := dialer.Dial(s.host, header) + err = e + if err == nil { + conn = newConnectionWrapper(c) + } + if err != nil { + log.Warn("Stats server unreachable", "err", err) + errTimer.Reset(10 * time.Second) + continue + } + + // Authenticate the client with the server + if err = s.login(conn); err != nil { + log.Warn("Stats login failed", "err", err) + conn.Close() + errTimer.Reset(10 * time.Second) + continue + } + go s.readLoop(conn) + + if err = s.report(conn); err != nil { + log.Warn("Initial stats report failed", "err", err) + conn.Close() + errTimer.Reset(0) + continue + } + + s.reportLoop(conn) + + log.Warn("[Bridge stats] Redial connection") + // Close the current connection and establish a new one + conn.Close() + errTimer.Reset(0) + } + } +} + +func (s *Service) Stop() { + close(s.quitCh) +} + +func (s *Service) login(conn *connWrapper) error { + // Construct and send the login authentication + auth := &authMsg{ + ID: s.node, + Info: &NodeInfo{ + Node: s.node, + Operator: s.operator, + BridgeVersion: bridgeVersion, + }, + Secret: s.secret, + } + login := map[string][]interface{}{ + "emit": {"hello", auth}, + } + if err := conn.WriteJSON(login); err != nil { + return err + } + // Retrieve the remote ack or connection termination + var ack map[string][]string + if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" { + return errors.New("unauthorized") + } + return nil +} + +// reportLoop loops as long as the connection is alive and try to report the bridge stats +// every sendStatsTicket. If the conenction is drop or we got any issues, we will exist the +// function for reprocessing the flow +func (s *Service) reportLoop(conn *connWrapper) error { + var err error + + sendStatsTicker := time.NewTicker(5 * time.Second) // Every 5 seconds + + for err == nil { + select { + case <-s.quitCh: + sendStatsTicker.Stop() + conn.Close() + return nil + case msg := <-s.errCh: + err = s.setLastError(msg.Listener, msg.Err) + case msg := <-s.processedBlockCh: + err = s.setProcessedBlock(msg.Listener, msg.ProcessedBlock) + case <-sendStatsTicker.C: // Checking stats interval + if err = s.report(conn); err != nil { + log.Warn("bridge stats report failed", "err", err) + // When bridge stats report failed need to relogn after 10 seconds + } + } + } + sendStatsTicker.Stop() + return nil +} + +func (s *Service) report(conn *connWrapper) error { + // collect bridge stats and send to ronin stats + info := &BridgeInfo{ + Node: s.node, + Operator: s.operator, + Version: bridgeVersion, + LastError: s.lastError, + ProcessedBlock: s.processedBlock, + } + + // count pending/failed tasks from database + pending, err := s.store.CountTasks(s.chainId, "pending") + if err != nil { + log.Error("error while getting pending tasks", "err", err) + } else { + info.PendingTasks = int(pending) + } + + failed, err := s.store.CountTasks(s.chainId, "failed") + if err != nil { + log.Error("error while getting failed tasks", "err", err) + } else { + info.FailedTasks = int(failed) + } + + report := map[string][]interface{}{ + "emit": {"bridge-stats", info}, + } + log.Info("[Bridge-Stats] Emit Bridge stats") + return conn.WriteJSON(report) +} + +// readLoop loops as long as the connection is alive and retrieves data packets +// from the network socket. If any of them match an active request, it forwards +// it, if they themselves are requests it initiates a reply, and lastly it drops +// unknown packets. +func (s *Service) readLoop(conn *connWrapper) { + // If the read loop exits, close the connection + defer conn.Close() + log.Info("[Bridge stats] Start read loop") + for { + // Exit the function when receiving the quit signal + select { + case <-s.quitCh: + return + default: + } + + // Retrieve the next generic network packet and bail out on error + var blob json.RawMessage + if err := conn.ReadJSON(&blob); err != nil { + log.Warn("Failed to retrieve stats server message", "err", err) + return + } + // If the network packet is a system ping, respond to it directly + var ping string + if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + log.Info("[Bridge stats] The client receives ping from peers, should pong again.") + if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + log.Warn("Failed to respond to system ping message", "err", err) + return + } + continue + } + + // Not a system ping, try to decode an actual state message + var msg map[string][]interface{} + if err := json.Unmarshal(blob, &msg); err != nil { + log.Warn("Failed to decode stats server message", "err", err) + return + } + log.Info("Received message from stats server", "msg", msg) + } +} + +func (s *Service) setLastError(listener, err string) error { + if err != "" { + s.lastError[listener] = err + } + return nil +} + +func (s *Service) setProcessedBlock(listener string, block uint64) error { + if s.processedBlock[listener] < block { + s.processedBlock[listener] = block + } + return nil +} + +func (s *Service) SendError(listener, err string) { + s.errCh <- errorMessage{Listener: listener, Err: err} +} + +func (s *Service) SendProcessedBlock(listener string, block uint64) { + s.processedBlockCh <- processedBlockMessage{Listener: listener, ProcessedBlock: block} +} diff --git a/stores/main.go b/stores/main.go index 1f0dc1b..98cc576 100644 --- a/stores/main.go +++ b/stores/main.go @@ -13,6 +13,7 @@ type TaskStore interface { DeleteTasks([]string, uint64) error Count() int64 ResetTo(ids []string, status string) error + CountTasks(chain, status string) (int64, error) } type DepositStore interface { diff --git a/stores/task.go b/stores/task.go index 6c61d78..4f57ca0 100644 --- a/stores/task.go +++ b/stores/task.go @@ -54,6 +54,12 @@ func (t *taskStore) GetTasks(chain, status string, limit, retrySeconds int, befo return tasks, err } +func (t *taskStore) CountTasks(chain, status string) (int64, error) { + var count int64 + err := t.Model(&models.Task{}).Where("chain_id = ? AND status = ?", chain, status).Count(&count).Error + return count, err +} + func (t *taskStore) UpdateTasksWithTransactionHash(txs []string, transactionStatus int, status string) error { return t.Model(&models.Task{}).Where("transaction_hash in ?", txs).Updates(map[string]interface{}{ "status": status, diff --git a/task/task.go b/task/task.go index 16b1275..dd4fd7e 100644 --- a/task/task.go +++ b/task/task.go @@ -2,6 +2,7 @@ package task import ( "crypto/ecdsa" + "github.com/axieinfinity/bridge-v2/stats" "math/big" "sort" "strings" @@ -119,6 +120,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro event, err := r.unpackBridgeOperatorSetUpdatedEvent(task) if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -128,6 +130,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro roninGovernanceTransactor, err := roninGovernance.NewGovernance(common.HexToAddress(r.contracts[GOVERNANCE_CONTRACT]), r.client) if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -135,6 +138,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro voted, err := roninGovernanceTransactor.BridgeOperatorsVoted(nil, event.Period, event.Epoch, r.listener.GetVoterSign().GetAddress()) if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -148,6 +152,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro syncedInfo, err := roninGovernanceTransactor.LastSyncedBridgeOperatorSetInfo(nil) if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -177,6 +182,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro signature, err := signBridgeOperatorsBallot(opts, event.Period.Int64(), event.Epoch.Int64(), bridgeOperators) if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -194,6 +200,7 @@ func (r *task) voteBridgeOperatorsBySignature(task *models.Task) (doneTasks, pro }) }) if err != nil { + stats.SendErrorToStats(r.listener, err) switch err.Error() { case ErrOutdatedPeriod: log.Warn("[RoninTask][BridgeOperatorSetCallback] Bridge operators period outdated") @@ -220,6 +227,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas // create caller roninGovernanceCaller, err := roninGovernance.NewGovernanceCaller(common.HexToAddress(r.contracts[GOVERNANCE_CONTRACT]), r.client) if err != nil { + stats.SendErrorToStats(r.listener, err) task.LastError = err.Error() failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil @@ -227,6 +235,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas ethGovernanceTransactor, err := ethGovernance.NewGovernance(common.HexToAddress(r.contracts[ETH_GOVERNANCE_CONTRACT]), r.ethClient) if err != nil { + stats.SendErrorToStats(r.listener, err) task.LastError = err.Error() failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil @@ -234,6 +243,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas event, err := r.unpackBridgeOperatorsApprovedEvent(task) if err != nil { + stats.SendErrorToStats(r.listener, err) task.LastError = err.Error() failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil @@ -244,6 +254,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas // Ethereum call ethSyncedInfo, err := ethGovernanceTransactor.LastSyncedBridgeOperatorSetInfo(nil) if err != nil { + stats.SendErrorToStats(r.listener, err) task.LastError = err.Error() failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil @@ -262,6 +273,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas signatures, err := roninGovernanceCaller.GetBridgeOperatorVotingSignatures(nil, event.Period, event.Epoch) if err != nil { + stats.SendErrorToStats(r.listener, err) task.LastError = err.Error() failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil @@ -292,6 +304,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas ethChainId, err := ethListener.GetChainID() if err != nil { task.LastError = err.Error() + stats.SendErrorToStats(r.listener, err) failedTasks = append(failedTasks, task) return nil, nil, failedTasks, nil } @@ -304,6 +317,7 @@ func (r *task) relayBridgeOperators(task *models.Task) (doneTasks, processingTas }, ethSignatures) }) if err != nil { + stats.SendErrorToStats(r.listener, err) // Prevent retry submit signature if the signature was already submitted switch err.Error() { case ErrSigAlreadySubmitted: diff --git a/task/task_test.go b/task/task_test.go index 03bb2f3..84ed8ed 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -4,6 +4,9 @@ import ( "context" "database/sql" "fmt" + "math/big" + "testing" + "github.com/DATA-DOG/go-sqlmock" "github.com/axieinfinity/bridge-contracts/generated_contracts/ethereum/gateway" internal "github.com/axieinfinity/bridge-core" @@ -20,8 +23,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" core2 "github.com/ethereum/go-ethereum/signer/core" "github.com/stretchr/testify/suite" - "math/big" - "testing" ) type SimulatedSuite struct {