Skip to content

Commit

Permalink
Follow-up tests for multi-tenant work (julienschmidt#279)
Browse files Browse the repository at this point in the history
* add test for addEvt in deployment cache

Signed-off-by: Aaron Schlesinger <[email protected]>

* Adding TestRunProxyServerCountMiddleware

Currently failing :)

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding resize timeout to the queue fake

Signed-off-by: Aaron Schlesinger <[email protected]>

* StartTestServer => NewTestServer

Signed-off-by: Aaron Schlesinger <[email protected]>

* StartTestServer => NewTestServer

Signed-off-by: Aaron Schlesinger <[email protected]>

* Going back to StartTestServer

I forgot that it actually does start the test server in the background

Signed-off-by: Aaron Schlesinger <[email protected]>

* using targetFromURL

Signed-off-by: Aaron Schlesinger <[email protected]>

* more StartTestServer

Signed-off-by: Aaron Schlesinger <[email protected]>

* Keeping track of the running queue count in the fake

Also protecting that map with a mutex

Signed-off-by: Aaron Schlesinger <[email protected]>

* finishing the interceptor test

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding test comment

Signed-off-by: Aaron Schlesinger <[email protected]>

* more movement over to targetFromURL

Signed-off-by: Aaron Schlesinger <[email protected]>

* additional checks

Signed-off-by: Aaron Schlesinger <[email protected]>
  • Loading branch information
arschles authored Oct 25, 2021
1 parent 3df5d09 commit 41e86c1
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 42 deletions.
128 changes: 119 additions & 9 deletions interceptor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/interceptor/config"
"github.com/kedacore/http-add-on/pkg/k8s"
kedanet "github.com/kedacore/http-add-on/pkg/net"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/kedacore/http-add-on/pkg/test"
Expand All @@ -23,17 +25,126 @@ import (
)

func TestRunProxyServerCountMiddleware(t *testing.T) {
// r := require.New(t)
// ctx, done := context.WithCancel(
// context.Background(),
// )
// defer done()
// r.NoError(runProxyServer(ctx, logr.Discard(), q, waitFunc, routingTable, timeouts, port))

// see https://github.com/kedacore/http-add-on/issues/245
const (
port = 8080
host = "samplehost"
)
r := require.New(t)
ctx, done := context.WithCancel(
context.Background(),
)
defer done()

originHdl := kedanet.NewTestHTTPHandlerWrapper(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}),
)
originSrv, originURL, err := kedanet.StartTestServer(originHdl)
r.NoError(err)
defer originSrv.Close()
originPort, err := strconv.Atoi(originURL.Port())
r.NoError(err)
g, ctx := errgroup.WithContext(ctx)
q := queue.NewFakeCounter()
routingTable := routing.NewTable()
// set up a fake host that we can spoof
// when we later send request to the proxy,
// so that the proxy calculates a URL for that
// host that points to the (above) fake origin
// server.
routingTable.AddTarget(
host,
targetFromURL(
originURL,
originPort,
"testdepl",
123,
),
)
timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
waitFunc := func(ctx context.Context, name string) error {
<-waiterCh
return nil
}
g.Go(func() error {
return runProxyServer(
ctx,
logr.Discard(),
q,
waitFunc,
routingTable,
timeouts,
port,
)
})
// wait for server to start
time.Sleep(500 * time.Millisecond)

// make an HTTP request in the background
g.Go(func() error {
req, err := http.NewRequest(
"GET",
fmt.Sprintf(
"http://0.0.0.0:%d", port,
), nil,
)
if err != nil {
return err
}
req.Host = host
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(
"unexpected status code: %d",
resp.StatusCode,
)
}
return nil
})
time.Sleep(100 * time.Millisecond)
select {
case hostAndCount := <-q.ResizedCh:
r.Equal(host, hostAndCount.Host)
r.Equal(+1, hostAndCount.Count)
case <-time.After(500 * time.Millisecond):
r.Fail("timeout waiting for +1 queue resize")
}

// tell the wait func to proceed
waiterCh <- struct{}{}

select {
case hostAndCount := <-q.ResizedCh:
r.Equal(host, hostAndCount.Host)
r.Equal(-1, hostAndCount.Count)
case <-time.After(500 * time.Millisecond):
r.Fail("timeout waiting for -1 queue resize")
}

// check the queue to make sure all counts are at 0
countsPtr, err := q.Current()
r.NoError(err)
counts := countsPtr.Counts
r.Equal(1, len(counts))
_, foundHost := counts[host]
r.True(
foundHost,
"couldn't find host %s in the queue",
host,
)
r.Equal(0, counts[host])

done()
r.Error(g.Wait())
}

func TestRunAdminServerDeploymentsEndpoint(t *testing.T) {

ctx := context.Background()
ctx, done := context.WithCancel(ctx)
defer done()
Expand Down Expand Up @@ -148,5 +259,4 @@ func TestRunAdminServerConfig(t *testing.T) {

done()
r.Error(errgrp.Wait())

}
42 changes: 24 additions & 18 deletions interceptor/proxy_handlers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func TestIntegrationHappyPath(t *testing.T) {
},
})

originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deplName,
})
h.routingTable.AddTarget(hostForTest(t), targetFromURL(
h.originURL,
originPort,
deplName,
123,
))

// happy path
res, err := doRequest(
Expand Down Expand Up @@ -115,13 +116,14 @@ func TestIntegrationNoReplicas(t *testing.T) {
h, err := newHarness(deployTimeout, time.Second)
r.NoError(err)

originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deployName,
})
h.routingTable.AddTarget(hostForTest(t), targetFromURL(
h.originURL,
originPort,
deployName,
123,
))

// 0 replicas
h.deplCache.Set(deployName, appsv1.Deployment{
Expand Down Expand Up @@ -162,13 +164,17 @@ func TestIntegrationWaitReplicas(t *testing.T) {
r.NoError(err)

// add host to routing table
originHost, originPort, err := splitHostPort(h.originURL.Host)
originPort, err := strconv.Atoi(h.originURL.Port())
r.NoError(err)
h.routingTable.AddTarget(hostForTest(t), routing.Target{
Service: originHost,
Port: originPort,
Deployment: deployName,
})
h.routingTable.AddTarget(
hostForTest(t),
targetFromURL(
h.originURL,
originPort,
deployName,
123,
),
)

// set up a deployment with zero replicas and create
// a watcher we can use later to fake-send a deployment
Expand Down
28 changes: 22 additions & 6 deletions interceptor/proxy_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"testing"
Expand All @@ -30,13 +31,14 @@ func TestImmediatelySuccessfulProxy(t *testing.T) {
r.NoError(err)
defer srv.Close()
routingTable := routing.NewTable()
portInt, err := strconv.Atoi(originURL.Port())
originPort, err := strconv.Atoi(originURL.Port())
r.NoError(err)
target := routing.Target{
Service: strings.Split(originURL.Host, ":")[0],
Port: portInt,
Deployment: "testdepl",
}
target := targetFromURL(
originURL,
originPort,
"testdepl",
123,
)
routingTable.AddTarget(host, target)

timeouts := defaultTimeouts()
Expand Down Expand Up @@ -332,3 +334,17 @@ func notifyingFunc() (func(context.Context, string) error, <-chan struct{}, func
}
}, calledCh, finishFunc
}

func targetFromURL(
u *url.URL,
port int,
deployment string,
targetPendingReqs int32,
) routing.Target {
return routing.Target{
Service: strings.Split(u.Host, ":")[0],
Port: port,
Deployment: deployment,
TargetPendingRequests: targetPendingReqs,
}
}
53 changes: 52 additions & 1 deletion pkg/k8s/deployment_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,58 @@ func callMergeAndBcast(
}

func TestK8sDeploymentCacheAddEvt(t *testing.T) {
// see https://github.com/kedacore/http-add-on/issues/245
r := require.New(t)
ctx, done := context.WithCancel(
context.Background(),
)
defer done()
cache, err := NewK8sDeploymentCache(ctx, logr.Discard(), newFakeDeploymentListerWatcher())
r.NoError(err)
checkDeploymentsInCache := func(depls ...appsv1.Deployment) {
t.Helper()
r := require.New(t)
for _, depl := range depls {
ret, ok := cache.latest[depl.GetName()]
r.True(ok)
r.Equal(depl, ret)
}
}

// add a first deployment and make sure it exists in
// the latest cache
depl1 := newDeployment("testns", "testdepl1", "testing", nil, nil, nil, core.PullAlways)
evt1 := watch.Event{
Type: watch.Added, // doesn't matter, addEvt doesn't look at this
Object: depl1,
}
r.NoError(cache.addEvt(evt1))
r.Equal(1, len(cache.latest))
checkDeploymentsInCache(*depl1)

// add a second (different name) and make sure both exist
// in the cache
depl2 := *depl1
depl2.Name = "testdepl2"
evt2 := watch.Event{
Type: watch.Modified,
Object: &depl2,
}
r.NoError(cache.addEvt(evt2))
r.Equal(2, len(cache.latest))
checkDeploymentsInCache(*depl1, depl2)

// try to add a pod, make sure addEvt fails, and afterward, make sure
// the original 2 deployments are still in the cache
otherObj := &core.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "somepod"},
}
evt3 := watch.Event{
Type: watch.Added,
Object: otherObj,
}
r.Error(cache.addEvt(evt3))
r.Equal(2, len(cache.latest))
checkDeploymentsInCache(*depl1, depl2)
}

// test to make sure that, even when no events come through, the
Expand Down
10 changes: 8 additions & 2 deletions pkg/net/mock_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ func NewTestHTTPHandlerWrapper(hdl http.Handler) *TestHTTPHandlerWrapper {
}
}

// StartTestServer starts an *httptest.Server, parses its URL, and returns both values.
// The caller is responsible for closing the returned server
// StartTestServer creates and starts an *httptest.Server
// in the background, then parses its URL and returns both
// values.
//
// If this function returns a nil error, the caller is
// responsible for closing the returned server. If it
// returns a non-nil error, the server and URL
// will be nil.
func StartTestServer(hdl http.Handler) (*httptest.Server, *url.URL, error) {
srv := httptest.NewServer(hdl)
u, err := url.Parse(srv.URL)
Expand Down
Loading

0 comments on commit 41e86c1

Please sign in to comment.