-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommitter.go
130 lines (108 loc) · 3.22 KB
/
committer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package gopaxos
type committer struct {
conf *config
ctx *commitCtx
loop *ioLoop
smFac *smFac
lock *waitLock
timeoutMs int
lastLogTime uint64
}
func newCommitter(conf *config, ctx *commitCtx, loop *ioLoop, smFac *smFac) *committer {
ret := &committer{}
ret.conf = conf
ret.ctx = ctx
ret.loop = loop
ret.smFac = smFac
ret.timeoutMs = -1
ret.lastLogTime = getSteadyClockMS()
ret.lock = newWaitLock()
return ret
}
func (c *committer) newValue(value []byte) int32 {
_, ret := c.newValueGetID(value, nil)
return ret
}
func (c *committer) newValueGetID(value []byte, smCtx *SMCtx) (uint64, int32) {
getBPInstance().NewValue()
var ret int32
var instanceID uint64
for retryCount := 3; retryCount > 0; retryCount-- {
ts := timeStat(0)
ts.point()
instanceID, ret = c.newValueGetIDNoRetry(value, smCtx)
if ret != int32(paxosTryCommitRet_Conflict) {
if ret == 0 {
getBPInstance().NewValueCommitOK(ts.point())
} else {
getBPInstance().NewValueCommitFail()
}
break
}
getBPInstance().NewValueConflict()
if smCtx != nil && smCtx.SMID == master_V_SMID {
//master sm not retry
break
}
}
return instanceID, ret
}
func (c *committer) newValueGetIDNoRetry(value []byte, smCtx *SMCtx) (uint64, int32) {
c.logStatus()
lockUseTimeMs, hasLock := c.lock.lock(c.timeoutMs)
if !hasLock {
if lockUseTimeMs > 0 {
getBPInstance().NewValueGetLockTimeout()
lPLGErr(c.conf.groupIdx, "Try get lock, but timeout, lockusetime %dms", lockUseTimeMs)
return 0, int32(paxosTryCommitRet_Timeout)
} else {
getBPInstance().NewValueGetLockReject()
lPLGErr(c.conf.groupIdx, "Try get lock, but too many thread waiting, reject")
return 0, int32(paxosTryCommitRet_TooManyThreadWaiting_Reject)
}
}
defer c.lock.unlock()
leftTimeoutMs := -1
if c.timeoutMs > 0 {
leftTimeoutMs = 0
if c.timeoutMs > lockUseTimeMs {
leftTimeoutMs = c.timeoutMs - lockUseTimeMs
}
if leftTimeoutMs < 200 {
lPLGErr(c.conf.groupIdx, "Get lock ok, but lockusetime %dms too long, lefttimeout %dms", lockUseTimeMs, leftTimeoutMs)
getBPInstance().NewValueGetLockTimeout()
return 0, int32(paxosTryCommitRet_Timeout)
}
}
lPLGImp(c.conf.groupIdx, "GetLock ok, use time %dms", lockUseTimeMs)
getBPInstance().NewValueGetLockOK(lockUseTimeMs)
//pack smid to value
var smID int64
if smCtx != nil {
smID = smCtx.SMID
}
packSMIDValue := make([]byte, len(value))
copy(packSMIDValue, value)
packSMIDValue = c.smFac.packPaxosValue(packSMIDValue, smID)
c.ctx.newCommit(packSMIDValue, smCtx, leftTimeoutMs)
c.loop.addNotify()
return c.ctx.getResult()
}
func (c *committer) setTimeoutMs(timeoutMs int) {
c.timeoutMs = timeoutMs
}
func (c *committer) setMaxHoldThreads(maxHoldThreads int) {
c.lock.setMaxWaitLogCount(maxHoldThreads)
}
func (c *committer) setProposeWaitTimeThresholdMs(waitTimeThresholdMs int) {
c.lock.setLockWaitTimeThreshold(waitTimeThresholdMs)
}
func (c *committer) logStatus() {
now := getSteadyClockMS()
if now > c.lastLogTime && now-c.lastLogTime > 1000 {
c.lastLogTime = now
lPLGStatus(c.conf.groupIdx, "wait threads %d avg thread wait ms %d reject rate %d",
c.lock.getNowHoldThreadCount(), c.lock.getNowAvgThreadWaitTime(),
c.lock.getNowRejectRate())
}
}