-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft_callbacks.go
72 lines (61 loc) · 2.55 KB
/
raft_callbacks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package failsafe
import (
"github.com/goraft/raft"
"time"
)
// AddEventListeners to add callback for raft server.
func (s *Server) AddEventListeners() {
rafts := s.raftServer
rafts.AddEventListener(raft.StateChangeEventType, s.raftStateChange)
rafts.AddEventListener(raft.LeaderChangeEventType, s.raftLeaderChange)
rafts.AddEventListener(raft.TermChangeEventType, s.raftTermChange)
rafts.AddEventListener(raft.CommitEventType, s.raftCommit)
rafts.AddEventListener(raft.AddPeerEventType, s.raftAddPeer)
rafts.AddEventListener(raft.RemovePeerEventType, s.raftRemovePeer)
rafts.AddEventListener(raft.HeartbeatEventType, s.raftHeartbeat)
rafts.AddEventListener(
raft.HeartbeatIntervalEventType, s.raftHeartbeatInterval)
rafts.AddEventListener(
raft.ElectionTimeoutThresholdEventType, s.raftElectionTimeoutThreshold)
}
func (s *Server) raftStateChange(e raft.Event) {
state, oldState := e.Value().(string), e.PrevValue().(string)
tracef("%v, changes state from %q to %q\n", s.logPrefix, oldState, state)
s.stats["raftStateChange"] = s.stats["raftStateChange"].(int) + 1
}
func (s *Server) raftLeaderChange(e raft.Event) {
leader, oldLeader := e.Value().(string), e.PrevValue().(string)
tracef("%v, leader changed from %q to %q\n", s.logPrefix, oldLeader, leader)
s.stats["raftLeaderChange"] = s.stats["raftLeaderChange"].(int) + 1
}
func (s *Server) raftTermChange(e raft.Event) {
term, oldTerm := e.Value().(string), e.PrevValue().(string)
tracef("%v, term changed from %q to %q\n", s.logPrefix, oldTerm, term)
s.stats["raftTermChange"] = s.stats["raftTermChange"].(int) + 1
}
func (s *Server) raftCommit(e raft.Event) {
s.stats["raftCommit"] = s.stats["raftCommit"].(int) + 1
}
func (s *Server) raftAddPeer(e raft.Event) {
peer := e.Value().(string)
tracef("%v, add peer %q\n", s.logPrefix, peer)
s.stats["raftAddPeer"] = s.stats["raftAddPeer"].(int) + 1
}
func (s *Server) raftRemovePeer(e raft.Event) {
peer := e.Value().(string)
tracef("%v, add peer %q\n", s.logPrefix, peer)
s.stats["raftRemovePeer"] = s.stats["raftRemovePeer"].(int) + 1
}
func (s *Server) raftHeartbeat(e raft.Event) {
s.stats["raftHeartbeat"] = s.stats["raftHeartbeat"].(int) + 1
}
func (s *Server) raftHeartbeatInterval(e raft.Event) {
v := s.stats["raftHeartbeatInterval"].(int) + 1
s.stats["raftHeartbeatInterval"] = v
}
func (s *Server) raftElectionTimeoutThreshold(e raft.Event) {
elapsedTime := e.Value().(time.Duration)
tracef("%v, elapsed time %v\n", s.logPrefix, elapsedTime)
v := s.stats["raftElectionTimeoutThreshold"].(int) + 1
s.stats["raftElectionTimeoutThreshold"] = v
}