Skip to content

Commit

Permalink
Merge pull request #49 from Clever/APPS-131-resiliency-when-redis-down
Browse files Browse the repository at this point in the history
Allow to become passive proxy on leakybucket errors
  • Loading branch information
Mike Graf committed Jun 19, 2015
2 parents 50d7988 + de49367 commit c359014
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 35 deletions.
7 changes: 4 additions & 3 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ Limit holds the yaml data for one of the limits in the config file

```go
type Proxy struct {
Handler string
Host string
Listen string
Handler string
Host string
Listen string
AllowOnError bool `yaml:"allow-on-error"`
}
```

Expand Down
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ type Config struct {

// Proxy holds the yaml data for the proxy option in the config file
type Proxy struct {
Handler string
Host string
Listen string
Handler string
Host string
Listen string
AllowOnError bool `yaml:"allow-on-error"`
}

// HealthCheck holds the yaml data for how to run the health check service.
Expand Down Expand Up @@ -56,6 +57,7 @@ func LoadYaml(data []byte) (Config, error) {
// ValidateConfig validates that a Config has all the required fields
// TODO (z): These should all be private, but right now tests depend on parsing bytes into yaml
func ValidateConfig(config Config) error {
// NOTE: tests depend on the order of these checks.
if config.Proxy.Handler == "" {
return fmt.Errorf("proxy.handler not set")
}
Expand All @@ -80,7 +82,7 @@ func ValidateConfig(config Config) error {
}

if len(config.Limits) < 1 {
return fmt.Errorf("no limits definied")
return fmt.Errorf("no limits defined")
}

for name, limit := range config.Limits {
Expand Down
13 changes: 10 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func TestConfigurationFileLoading(t *testing.T) {
t.Error("expected http for Proxy.Handler")
}

if config.Proxy.AllowOnError != true {
t.Error("expected true for proxy.allow-on-error: Yes")
}

if config.HealthCheck.Port != "60002" {
t.Error("expected 60002 for HealthCheck.Port")
}
Expand Down Expand Up @@ -65,6 +69,7 @@ forward
// Incorrect configuration file should return errors
func TestInvalidProxyConfig(t *testing.T) {

// proxy.handler not set
invalidConfig := []byte(`
proxy:
host: http://proxy.example.com
Expand All @@ -74,6 +79,7 @@ proxy:
t.Errorf("Expected proxy handler error. Got: %s", err.Error())
}

// proxy.listen not set
invalidConfig = []byte(`
proxy:
handler: http
Expand All @@ -91,7 +97,8 @@ proxy:
listen: :8000
`)
_, err = LoadAndValidateYaml(invalidConfig)
if err == nil || !strings.Contains(err.Error(), "proxy") {

if err == nil || !strings.Contains(err.Error(), "proxy.host") {
t.Errorf("Expected proxy host error. Got: %s", err.Error())
}
}
Expand All @@ -112,7 +119,7 @@ storage:
limits:
bearer/events:
keys:
headers:
headers:
- 'Authentication'
`)
_, err := LoadAndValidateYaml(configBuf.Bytes())
Expand All @@ -126,7 +133,7 @@ limits:
bearer/events:
interval: 10
keys:
headers:
headers:
- 'Authentication'
`)
_, err = LoadAndValidateYaml(configBuf.Bytes())
Expand Down
2 changes: 1 addition & 1 deletion daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *daemon) LoadConfig(newConfig config.Config) error {
// Set the proxy and handler daemon fields
switch d.proxy.Handler {
case "http":
d.handler = handlers.NewHTTPLimiter(rateLimiter, proxy)
d.handler = handlers.NewHTTPLimiter(rateLimiter, proxy, d.proxy.AllowOnError)
return nil
case "httplogger":
d.handler = handlers.NewHTTPLogger(rateLimiter, proxy)
Expand Down
2 changes: 1 addition & 1 deletion deb/sphinx/DEBIAN/control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: sphinx
Version: 0.3.1
Version: 0.4.1
Section: base
Priority: optional
Architecture: amd64
Expand Down
1 change: 1 addition & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ proxy:
handler: http # can be {http,httplogger}
host: http://httpbin.org # URI for the http(s) backend we are proxying to
listen: :6634 # bind to host:port. default: height of the Great Sphinx of Giza
allow-on-error: Yes # become passive proxy if error detected? (optional, default=No)

storage:
type: memory # can be {redis,memory}
Expand Down
8 changes: 7 additions & 1 deletion handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

## Usage

```go
const (
StatusTooManyRequests = 429 // not in net/http package
)
```

#### func NewHTTPLimiter

```go
func NewHTTPLimiter(rateLimiter ratelimiter.RateLimiter, proxy http.Handler) http.Handler
func NewHTTPLimiter(rateLimiter ratelimiter.RateLimiter, proxy http.Handler, allowOnError bool) http.Handler
```
NewHTTPLimiter returns an http.Handler that rate limits and proxies requests.

Expand Down
36 changes: 23 additions & 13 deletions handlers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"strings"
)

const (
StatusTooManyRequests = 429 // not in net/http package
)

func stringifyHeaders(headers http.Header) *bytes.Buffer {
buf := &bytes.Buffer{}
for header, values := range headers {
Expand All @@ -35,26 +39,32 @@ func stringifyRequest(req common.Request) *bytes.Buffer {
}

type httpRateLimiter struct {
rateLimiter ratelimiter.RateLimiter
proxy http.Handler
rateLimiter ratelimiter.RateLimiter
proxy http.Handler
allowOnError bool // Do not limit on errors when true
}

func (hrl httpRateLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
guid := uuid.New()
request := common.HTTPToSphinxRequest(r)
log.Printf("[%s] REQUEST: %s", guid, stringifyRequest(request).String())
matches, err := hrl.rateLimiter.Add(request)
if err != nil && err != leakybucket.ErrorFull {
switch {
case err == leakybucket.ErrorFull:
addRateLimitHeaders(w, matches)
w.WriteHeader(StatusTooManyRequests)
case err != nil && hrl.allowOnError:
log.Printf("[%s] ERROR: %s", guid, err)
w.WriteHeader(500)
return
}
addRateLimitHeaders(w, matches)
if err == leakybucket.ErrorFull {
w.WriteHeader(429)
return
log.Printf("[%s] WARNING: bypassing rate limiter due to Error", guid)
hrl.proxy.ServeHTTP(w, r)
case err != nil:
log.Printf("[%s] ERROR: %s", guid, err)
w.WriteHeader(http.StatusInternalServerError)

default:
addRateLimitHeaders(w, matches)
hrl.proxy.ServeHTTP(w, r)
}
hrl.proxy.ServeHTTP(w, r)
}

type httpRateLogger httpRateLimiter
Expand Down Expand Up @@ -118,8 +128,8 @@ func addRateLimitHeaders(w http.ResponseWriter, statuses []ratelimiter.Status) {
}

// NewHTTPLimiter returns an http.Handler that rate limits and proxies requests.
func NewHTTPLimiter(rateLimiter ratelimiter.RateLimiter, proxy http.Handler) http.Handler {
return &httpRateLimiter{rateLimiter: rateLimiter, proxy: proxy}
func NewHTTPLimiter(rateLimiter ratelimiter.RateLimiter, proxy http.Handler, allowOnError bool) http.Handler {
return &httpRateLimiter{rateLimiter: rateLimiter, proxy: proxy, allowOnError: allowOnError}
}

// NewHTTPLogger returns an http.Handler that logs the results of rate limiting requests, but
Expand Down
52 changes: 46 additions & 6 deletions handlers/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func TestHandleWhenFull(t *testing.T) {
limiter.ServeHTTP(w, r)

compareStatusesToHeader(t, w.Header(), statuses)
if w.Code != 429 {
t.Fatalf("expected status 429, received %d", w.Code)
if w.Code != StatusTooManyRequests {
t.Fatalf("expected status %d, received %d", StatusTooManyRequests, w.Code)
}
limitMock.AssertExpectations(t)
}
Expand All @@ -205,8 +205,8 @@ func TestHandleWhenErrWithStatus(t *testing.T) {
limiter.ServeHTTP(w, r)
assertNoRateLimitHeaders(t, w.Header())

if w.Code != 500 {
t.Fatalf("expected status 500, received %d", w.Code)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected status %d, received %d", http.StatusInternalServerError, w.Code)
}
limitMock.AssertExpectations(t)
}
Expand All @@ -226,8 +226,48 @@ func TestHandleWhenErrWithoutStatus(t *testing.T) {
limiter.ServeHTTP(w, r)
assertNoRateLimitHeaders(t, w.Header())

if w.Code != 500 {
t.Fatalf("expected status 500, received %d", w.Code)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected status %d, received %d", http.StatusInternalServerError, w.Code)
}
limitMock.AssertExpectations(t)
}

// Test cases when an error occurs and either value of allowOnError
var allowOnErrorCases = []struct {
allowOnError bool
ExpectedCode int
}{
// It should still block if allowOnError == false
{false, http.StatusInternalServerError},
// If allowOnError == true and no headers, should still StatusOK
{true, http.StatusOK},
}

// Tests the allowOnError flag feature
func TestallowOnError(t *testing.T) {
for _, test := range allowOnErrorCases {
limiter := constructHTTPRateLimiter()
limiter.allowOnError = test.allowOnError
w := httptest.NewRecorder()
r, err := http.NewRequest("GET", "http://google.com", strings.NewReader("thebody"))
if err != nil {
t.Fatal(err)
}

// Setup an error case (simulate redis connection error)
limitMock := limiter.rateLimiter.(*MockRateLimiter).Mock
limitMock.On("Add", anyRequest).
Return([]ratelimiter.Status{sphinxStatus}, errors.New("Expected Testing error - redis.conn error")).
Once()

proxyMock := limiter.proxy.(*MockProxy).Mock
proxyMock.On("ServeHTTP", w, r).Return().Once()
limiter.ServeHTTP(w, r)

assertNoRateLimitHeaders(t, w.Header())

if w.Code != test.ExpectedCode {
t.Fatalf("expected status %d, received %d", test.ExpectedCode, w.Code)
}
}
}
2 changes: 1 addition & 1 deletion limitkeys/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (eke EmptyKeyError) Error() string

```go
type LimitKey interface {
Key(common.Request) (string, error)
Type() string
Key(common.Request) (string, error)
}
```

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func sighupHandler(d daemon.Daemon) {
log.Println("Reloaded config file")
}

// setupSighupHandler craetes a channel to listen for HUP signals and process them.
// setupSighupHandler creates a channel to listen for HUP signals and process them.
func setupSighupHandler(d daemon.Daemon, handler func(daemon.Daemon)) {
sigc := make(chan os.Signal)
signal.Notify(sigc, syscall.SIGHUP)
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setUpHTTPLimiter(b *testing.B) {
// ignore the url in the config and use localhost
target, _ := url.Parse(host)
proxy := httputil.NewSingleHostReverseProxy(target)
httpLimiter := handlers.NewHTTPLimiter(rateLimiter, proxy)
httpLimiter := handlers.NewHTTPLimiter(rateLimiter, proxy, false)

go http.ListenAndServe(":8082", httpLimiter)
}
Expand Down

0 comments on commit c359014

Please sign in to comment.