Skip to content

Commit

Permalink
rename query parameter tick to since
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Aug 13, 2024
1 parent d59c447 commit 43617ba
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
24 changes: 13 additions & 11 deletions service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
var (
ErrInvalidWorker = errors.New("invalid worker, require positive integer")
ErrInvalidBatch = errors.New("invalid batch, require positive integer")
ErrInvalidTick = errors.New("invalid tick, require non-negative integer")
ErrInvalidSince = errors.New("invalid since, require non-negative integer")
ErrInvalidHaAddrs = errors.New("invalid ha_addrs, require at least two addresses as <host:port>, comma-separated")
)

Expand Down Expand Up @@ -290,8 +290,7 @@ func (hs *HttpService) HandlerReplica(w http.ResponseWriter, req *http.Request)
db := req.URL.Query().Get("db")
mm := req.URL.Query().Get("mm")
if mm == "" {
// compatible with version <= 2.5.11
mm = req.URL.Query().Get("meas")
mm = req.URL.Query().Get("meas") // compatible with version <= 2.5.11
}
if db != "" && mm != "" {
key := hs.ip.GetKey(db, mm)
Expand Down Expand Up @@ -857,7 +856,7 @@ func (hs *HttpService) setParam(req *http.Request) error {
if err != nil {
return err
}
err = hs.setTick(req)
err = hs.setSince(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -896,16 +895,19 @@ func (hs *HttpService) setBatch(req *http.Request) error {
return nil
}

func (hs *HttpService) setTick(req *http.Request) error {
str := strings.TrimSpace(req.FormValue("tick"))
func (hs *HttpService) setSince(req *http.Request) error {
str := strings.TrimSpace(req.FormValue("since"))
if str == "" {
str = strings.TrimSpace(req.FormValue("tick")) // compatible with version <= 2.5.11
}
if str != "" {
tick, err := strconv.ParseInt(str, 10, 64)
if err != nil || tick < 0 {
return ErrInvalidTick
since, err := strconv.ParseInt(str, 10, 64)
if err != nil || since < 0 {
return ErrInvalidSince
}
hs.tx.Tick = tick
hs.tx.Since = since
} else {
hs.tx.Tick = transfer.DefaultTick
hs.tx.Since = transfer.DefaultSince
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions transfer/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
RetryInterval = 15
DefaultWorker = 5
DefaultBatch = 20000
DefaultTick = int64(0)
DefaultSince = int64(0)
tlog = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
)

Expand All @@ -52,7 +52,7 @@ type Transfer struct {
getKeyFn func(string, string) string
Worker int
Batch int
Tick int64
Since int64
Resyncing bool
HaAddrs []string
}
Expand All @@ -64,7 +64,7 @@ func NewTransfer(cfg *backend.ProxyConfig, circles []*backend.Circle, getKeyFn f
getKeyFn: getKeyFn,
Worker: DefaultWorker,
Batch: DefaultBatch,
Tick: DefaultTick,
Since: DefaultSince,
}
for idx, circfg := range cfg.Circles {
tx.CircleStates[idx] = NewCircleState(circfg, circles[idx])
Expand All @@ -81,7 +81,7 @@ func (tx *Transfer) resetCircleStates() {
func (tx *Transfer) resetBasicParam() {
tx.Worker = DefaultWorker
tx.Batch = DefaultBatch
tx.Tick = DefaultTick
tx.Since = DefaultSince
}

func (tx *Transfer) setLogOutput(name string) {
Expand Down Expand Up @@ -280,13 +280,13 @@ func (tx *Transfer) query(ch chan *QueryResult, src *backend.Backend, db, rp, mm
var rsp *backend.ChunkedResponse
var err error
q := fmt.Sprintf("select * from \"%s\".\"%s\"", util.EscapeIdentifier(rp), util.EscapeIdentifier(mm))
if tx.Tick > 0 {
q = fmt.Sprintf("%s where time >= %ds", q, tx.Tick)
if tx.Since > 0 {
q = fmt.Sprintf("%s where time >= %ds", q, tx.Since)
}
for i := 0; i <= RetryCount; i++ {
if i > 0 {
time.Sleep(time.Duration(RetryInterval) * time.Second)
tlog.Printf("transfer query retry: %d, err:%s src:%s db:%s rp:%s mm:%s batch:%d tick:%d", i, err, src.Url, db, rp, mm, tx.Batch, tx.Tick)
tlog.Printf("transfer query retry: %d, err:%s src:%s db:%s rp:%s mm:%s batch:%d since:%d", i, err, src.Url, db, rp, mm, tx.Batch, tx.Since)
}
rsp, err = src.QueryChunk("GET", db, q, "ns", tx.Batch)
if err == nil {
Expand Down Expand Up @@ -359,9 +359,9 @@ func (tx *Transfer) submitTransfer(cs *CircleState, src *backend.Backend, dsts [
defer cs.wg.Done()
err := tx.transfer(src, dsts, db, rp, mm)
if err == nil {
tlog.Printf("transfer done, src:%s dst:%v db:%s rp:%s mm:%s batch:%d tick:%d", src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Tick)
tlog.Printf("transfer done, src:%s dst:%v db:%s rp:%s mm:%s batch:%d since:%d", src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Since)
} else {
tlog.Printf("transfer error: %s, src:%s dst:%v db:%s rp:%s mm:%s batch:%d tick:%d", err, src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Tick)
tlog.Printf("transfer error: %s, src:%s dst:%v db:%s rp:%s mm:%s batch:%d since:%d", err, src.Url, getBackendUrls(dsts), db, rp, mm, tx.Batch, tx.Since)
}
})
}
Expand Down

0 comments on commit 43617ba

Please sign in to comment.