Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add benchmark workflows #1276

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Benchmark

on:
pull_request:
paths:
- pkg/**/*
- cmd/**/*
- test/**/*
- hack/**/*
- kustomize/**/*
- go.mod
- .github/workflows/benchmark.yaml
- '!hack/releases-helm-chart.sh'
push:
paths:
- pkg/**/*
- cmd/**/*
- test/**/*
- hack/**/*
- kustomize/**/*
- go.mod
- .github/workflows/benchmark.yaml
- '!hack/releases-helm-chart.sh'

env:
CGO_ENABLED: "0"
GO_VERSION: "1.23.0"

jobs:
benchmark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}

- name: Test Benchmark
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark

- name: Test Benchmark Hack
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark-hack

- name: Upload logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: kwok-logs-benchmark
path: ${{ github.workspace }}/logs
6 changes: 0 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,6 @@ jobs:
fi
./hack/e2e-test.sh kwokctl/kwokctl_${{ matrix.kwokctl-runtime }}

- name: Test Benchmark
if: ${{ matrix.os == 'ubuntu-latest' && matrix.kwokctl-runtime == 'binary' }}
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark

- name: Test Auto Detect
if: ${{ matrix.kwokctl-runtime == 'binary' }}
shell: bash
Expand Down
9 changes: 7 additions & 2 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Controller struct {

podOnNodeManageQueue queue.Queue[string]
nodeManageQueue queue.Queue[string]

podOnNodeManageQueueParallelism *queue.AdaptiveQueue[string]
nodeManageQueueParallelism *queue.AdaptiveQueue[string]
}

// Config is the configuration for the controller
Expand Down Expand Up @@ -295,6 +298,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
c.nodeLeases.ReleaseHold(nodeName)
}

c.nodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.nodeManageQueue, c.nodeLeaseSyncWorker)
go c.nodeLeaseSyncWorker(ctx)

err = c.nodeLeases.Start(ctx)
Expand All @@ -307,7 +311,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.nodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.nodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down Expand Up @@ -339,6 +343,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
return fmt.Errorf("failed to init pod controller: %w", err)
}

c.podOnNodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.podOnNodeManageQueue, c.podsOnNodeSyncWorker)
go c.podsOnNodeSyncWorker(ctx)

case nodeRef:
Expand Down Expand Up @@ -559,7 +564,7 @@ func (c *Controller) Start(ctx context.Context) error {
func (c *Controller) podsOnNodeSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.podOnNodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.podOnNodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type NodeController struct {
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Node]]
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Node]]
backoff wait.Backoff
recorder record.EventRecorder
readOnlyFunc func(nodeName string) bool
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
// if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed
func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -323,7 +325,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
node, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
node, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type NodeLeaseController struct {
// mutateLeaseFunc allows customizing a lease object
mutateLeaseFunc func(*coordinationv1.Lease) error

delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, bool]
delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, bool]
delayQueueParallelism *queue.AdaptiveQueue[string]

holderIdentity string
onNodeManagedFunc func(nodeName string)
Expand Down Expand Up @@ -99,6 +100,7 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle

// Start starts the NodeLeaseController
func (c *NodeLeaseController) Start(ctx context.Context) error {
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.syncWorker)
for i := uint(0); i < c.leaseParallelism; i++ {
go c.syncWorker(ctx)
}
Expand All @@ -108,7 +110,7 @@ func (c *NodeLeaseController) Start(ctx context.Context) error {
func (c *NodeLeaseController) syncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type PodController struct {
playStageParallelism uint
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Pod]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Pod]]
backoff wait.Backoff
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]]
recorder record.EventRecorder
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
// It will modify the pods status to we want
func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -258,7 +260,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
pod, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
pod, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/stage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type StageController struct {
playStageParallelism uint
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*unstructured.Unstructured]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*unstructured.Unstructured]]
backoff wait.Backoff
delayQueueMapping maps.SyncMap[string, resourceStageJob[*unstructured.Unstructured]]
recorder record.EventRecorder
Expand Down Expand Up @@ -123,6 +124,7 @@ func NewStageController(conf StageControllerConfig) (*StageController, error) {
// It will modify the resources status to we want
func (c *StageController) Start(ctx context.Context, events <-chan informer.Event[*unstructured.Unstructured]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -236,7 +238,7 @@ func (c *StageController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
resource, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
resource, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kwok/server/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableC

// Setup pprof handlers.
s.restfulCont.Handle(pprofBasePath, http.HandlerFunc(pprof.Index))
s.restfulCont.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
s.restfulCont.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
s.restfulCont.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
if enableContentionProfiling {
runtime.SetBlockProfileRate(1)
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/utils/queue/adaptive_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"context"
"sync"
"time"
)

type AdaptiveQueue[T any] struct {
ctx context.Context
startFunc func(ctx context.Context)
latestStart time.Time
mut sync.Mutex
queue Queue[T]
}

func NewAdaptiveQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *AdaptiveQueue[T] {
return &AdaptiveQueue[T]{
ctx: ctx,
startFunc: startFunc,
latestStart: time.Now(),
queue: q,
}
}

func (p *AdaptiveQueue[T]) GetOrWaitWithDone(done <-chan struct{}) (T, bool) {
t, ok := p.queue.GetOrWaitWithDone(done)
if !ok {
return t, false
}

length := p.queue.Len()
if length > 3 {
p.mut.Lock()
defer p.mut.Unlock()
now := time.Now()
sub := now.Sub(p.latestStart)

if sub >= time.Second/10 {
go p.startFunc(p.ctx)
p.latestStart = now
}
}
return t, true
}
Loading
Loading