Skip to content

Commit

Permalink
rate-limit access to cluster on the front (part two)
Browse files Browse the repository at this point in the history
* prev. commit: a1111af
* support all three (GET, PUT, DELETE)
  - rate-limiter instance per bucket, per operation
* hash bucket structure (one-way)
  - use it to resolve specific rate-limiter
* report 429 quietly
* PASS: `aisloader --pctput=100 --randomproxy`

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Feb 27, 2025
1 parent f620517 commit b82fd1e
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 11 deletions.
8 changes: 4 additions & 4 deletions ais/prate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ func (rl *ratelim) init() {
hk.Reg(prateName+hk.NameSuffix, rl.housekeep, hk.PruneFrontendRL)
}

func (rl *ratelim) apply(bck *meta.Bck, smap *smapX) error {
func (rl *ratelim) apply(bck *meta.Bck, verb string, smap *smapX) error {
if !bck.Props.RateLimit.Frontend.Enabled {
return nil
}
var (
brl *cos.BurstRateLim
uname = bck.MakeUname("")
v, ok = rl.Load(uname)
uhash = bck.HashUname(verb)
v, ok = rl.Load(uhash)
)
if ok {
brl = v.(*cos.BurstRateLim)
} else {
// ignore sleep time - only relevant for clients
brl, _ = bck.NewFrontendRateLim(smap.CountActivePs())
rl.Store(uname, brl)
rl.Store(uhash, brl)
}
if !brl.TryAcquire() {
return errors.New(http.StatusText(http.StatusTooManyRequests))
Expand Down
21 changes: 17 additions & 4 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,14 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..

started := time.Now()

// 3. redirect
// 3. rate limit
smap := p.owner.smap.get()
if err := p.ratelim.apply(bck, http.MethodGet, smap); err != nil {
p.writeErr(w, r, err, http.StatusTooManyRequests, Silent)
return
}

// 4. redirect
tsi, netPub, err := smap.HrwMultiHome(bck.MakeUname(objName))
if err != nil {
p.statsT.IncBck(stats.ErrGetCount, bck.Bucket())
Expand All @@ -762,7 +768,7 @@ func (p *proxy) httpobjget(w http.ResponseWriter, r *http.Request, origURLBck ..
redirectURL := p.redirectURL(r, tsi, started, cmn.NetIntraData, netPub)
http.Redirect(w, r, redirectURL, http.StatusMovedPermanently)

// 4. stats
// 5. stats
p.statsT.IncBck(stats.GetCount, bck.Bucket())
}

Expand Down Expand Up @@ -816,8 +822,8 @@ func (p *proxy) httpobjput(w http.ResponseWriter, r *http.Request, apireq *apiRe

// 3. rate limit
smap := p.owner.smap.get()
if err := p.ratelim.apply(bck, smap); err != nil {
p.writeErr(w, r, err, http.StatusTooManyRequests)
if err := p.ratelim.apply(bck, http.MethodPut, smap); err != nil {
p.writeErr(w, r, err, http.StatusTooManyRequests, Silent)
return
}

Expand Down Expand Up @@ -884,7 +890,14 @@ func (p *proxy) httpobjdelete(w http.ResponseWriter, r *http.Request) {
p.writeErr(w, r, err)
return
}

// rate limit
smap := p.owner.smap.get()
if err := p.ratelim.apply(bck, http.MethodDelete, smap); err != nil {
p.writeErr(w, r, err, http.StatusTooManyRequests, Silent)
return
}

tsi, err := smap.HrwName2T(bck.MakeUname(objName))
if err != nil {
p.statsT.IncBck(stats.ErrDeleteCount, bck.Bucket())
Expand Down
19 changes: 17 additions & 2 deletions cmn/bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/OneOfOne/xxhash"
)

type (
Expand Down Expand Up @@ -343,10 +344,11 @@ func (b *Bck) LenUnameGlob(objName string) int {
return len(b.Provider) + 1 + len(NsGlobalUname) + 1 + len(b.Name) + 1 + len(objName) // compare with the below
}

// Bck => unique name (use ParseUname below to translate back)
// Bck => unique name
// - use ParseUname below to translate back
// - compare with HashUname
func (b *Bck) MakeUname(objName string) []byte {
var (
// TODO: non-global case can be optimized via b.Ns._copy(buf)
nsUname = b.Ns.Uname()
l = len(b.Provider) + 1 + len(nsUname) + 1 + len(b.Name) + 1 + len(objName) // compare with the above
buf = make([]byte, 0, l)
Expand All @@ -365,6 +367,19 @@ func (b *Bck) ubuf(buf []byte, nsUname, objName string) []byte {
return buf
}

// alternative (one-way) uniqueness
func (b *Bck) HashUname(s string /*verb*/) uint64 {
const sepa = "\x00"
h := xxhash.New64()
h.WriteString(s)
h.WriteString(sepa)
h.WriteString(b.Provider)
nsName := b.Ns.Uname()
h.WriteString(nsName)
h.WriteString(b.Name)
return h.Sum64()
}

//
// Is-Whats
//
Expand Down
2 changes: 1 addition & 1 deletion cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ func (*RateLimitConf) verbs(tag, name, value string) error {
lst[i] = strings.TrimSpace(val)
}
for _, s := range lst {
kv := strings.Split(strings.ToLower(s), ":")
kv := strings.Split(strings.ToUpper(s), ":") // upper as in: http.MethodGet, et al.
if len(kv) != 2 {
return fmt.Errorf("%s: invalid format %s (number of items in '%v')", tag, name, kv)
}
Expand Down
1 change: 1 addition & 0 deletions core/meta/bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (b *Bck) IsQuery() bool { return (*cmn.Bck)(b).IsQuery() }
func (b *Bck) RemoteBck() *cmn.Bck { return (*cmn.Bck)(b).RemoteBck() }
func (b *Bck) Validate() error { return (*cmn.Bck)(b).Validate() }
func (b *Bck) MakeUname(name string) []byte { return (*cmn.Bck)(b).MakeUname(name) }
func (b *Bck) HashUname(s string) uint64 { return (*cmn.Bck)(b).HashUname(s) }
func (b *Bck) Cname(name string) string { return (*cmn.Bck)(b).Cname(name) }
func (b *Bck) IsEmpty() bool { return (*cmn.Bck)(b).IsEmpty() }
func (b *Bck) HasVersioningMD() bool { return (*cmn.Bck)(b).HasVersioningMD() }
Expand Down

0 comments on commit b82fd1e

Please sign in to comment.