-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplayer.go
97 lines (80 loc) · 1.79 KB
/
replayer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package gopaxos
import "time"
type rePlayer struct {
conf *config
smFac *smFac
paxosLog *paxosLog
cpMgr *checkpointMgr
canRun bool
isPause bool
isEnd bool
quit chan struct{}
}
func newRePlayer(conf *config, smFac *smFac, ls LogStorage, cpMgr *checkpointMgr) *rePlayer {
return &rePlayer{
conf: conf,
smFac: smFac,
paxosLog: newPaxosLog(ls),
cpMgr: cpMgr,
isPause: true,
}
}
func (r *rePlayer) start() {
r.quit = make(chan struct{}, 1)
go r.run()
}
func (r *rePlayer) stop() {
r.isEnd = true
if r.quit != nil {
<-r.quit
r.quit = nil
}
}
func (r *rePlayer) pause() {
r.canRun = false
}
func (r *rePlayer) resume() {
r.isPause = false
r.canRun = true
}
func (r *rePlayer) isPaused() bool {
return r.isPause
}
func (r *rePlayer) run() {
lPLGHead(r.conf.groupIdx, "Checkpoint.Replayer [START]")
instanceID := r.smFac.getCheckpointInstanceID(r.conf.groupIdx) + 1
for {
if r.isEnd {
lPLGHead(r.conf.groupIdx, "Checkpoint.Replayer [END]")
close(r.quit)
return
}
if !r.canRun {
r.isPause = true
time.Sleep(time.Second)
continue
}
if instanceID >= r.cpMgr.getMaxChosenInstanceID() {
time.Sleep(time.Second)
continue
}
if r.playOne(instanceID) {
lPLGImp(r.conf.groupIdx, "Play one done, instanceid %d", instanceID)
instanceID++
} else {
lPLGErr(r.conf.groupIdx, "Play one fail, instanceid %d", instanceID)
time.Sleep(time.Millisecond * 500)
}
}
}
func (r *rePlayer) playOne(instanceID uint64) bool {
state, err := r.paxosLog.readState(r.conf.groupIdx, instanceID)
if err != nil {
return false
}
if !r.smFac.executeForCheckpoint(r.conf.groupIdx, instanceID, state.GetAcceptedValue()) {
lPLGErr(r.conf.groupIdx, "Checkpoint sm excute fail, instanceid %d", instanceID)
return false
}
return true
}