Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allocate dedicated without exclusive and reclaim on a numa node #618

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
maputil "k8s.io/kubernetes/pkg/util/maps"

Expand All @@ -36,10 +37,12 @@ import (
advisorapi "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/cpuadvisor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/process"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
)

const (
Expand Down Expand Up @@ -284,7 +287,7 @@ func (p *DynamicPolicy) allocateByCPUAdvisor(resp *advisorapi.ListAndWatchRespon

// generateBlockCPUSet generates BlockCPUSet from cpu-advisor response
// and the logic contains three main steps
// 1. handle blocks for static pools
// 1. handle blocks for static pools and pods(dedicated_cores with numa_binding without numa_exclusive)
// 2. handle blocks with specified NUMA ids (probably be blocks for
// numa_binding dedicated_cores containers and reclaimed_cores containers colocated with them)
// 3. handle blocks without specified NUMA id (probably be blocks for
Expand Down Expand Up @@ -322,6 +325,44 @@ func (p *DynamicPolicy) generateBlockCPUSet(resp *advisorapi.ListAndWatchRespons
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])
}

// walk through static pods to reuse cpuset if exists.
qosConf := generic.NewQoSConfiguration()
for podUID, containerEntries := range p.state.GetPodEntries() {
if containerEntries.IsPoolEntry() {
continue
}
if containerEntries.GetMainContainerEntry().QoSLevel != consts.PodAnnotationQoSLevelDedicatedCores {
continue
}
pod, err := p.metaServer.GetPod(context.Background(), podUID)
if err != nil {
err = fmt.Errorf("getPod %s fail: %v", podUID, err)
return nil, err
}
if !qos.IsPodNumaBinding(qosConf, pod) {
continue
}
if qos.IsPodNumaExclusive(qosConf, pod) {
continue
}

for containerName, allocationInfo := range containerEntries {
for numaID, cpuset := range allocationInfo.TopologyAwareAssignments {
blocks, ok := resp.GeEntryNUMABlocks(podUID, containerName, int64(numaID))
if !ok || len(blocks) != 1 {
klog.Error(err)
return nil, fmt.Errorf("blocks of pod %v container %v numaID %v is invalid", pod.Name, containerName, numaID)
}

blockID := blocks[0].BlockId
blockCPUSet[blockID] = cpuset.Clone()
availableCPUs = availableCPUs.Difference(blockCPUSet[blockID])

klog.V(6).Infof("pod %v container %v numaId %v reuse cpuset %v", pod.Name, containerName, numaID, blockCPUSet[blockID])
}
}
}

// walk through all blocks with specified NUMA ids
// for each block, add them into blockCPUSet (if not exist) and renew availableCPUs
for numaID, blocks := range numaToBlocks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
Expand Down Expand Up @@ -597,7 +598,11 @@ func (p *DynamicPolicy) adjustPoolsAndIsolatedEntries(poolsQuantityMap map[strin
) error {
availableCPUs := machineState.GetFilteredAvailableCPUSet(p.reservedCPUs, nil, state.CheckDedicatedNUMABinding)

poolsCPUSet, isolatedCPUSet, err := p.generatePoolsAndIsolation(poolsQuantityMap, isolatedQuantityMap, availableCPUs)
// availableCPUs only for reclaimed pool, all unused cpu on dedicated without exclusive NUMA
reclaimedAvailableCPUs := machineState.GetMatchedAvailableCPUSet(p.reservedCPUs, state.CheckDedicatedNUMABindingWithoutNUMAExclusive, state.CheckDedicatedNUMABindingWithoutNUMAExclusive)
klog.V(6).Infof("adjustPoolsAndIsolatedEntries availableCPUs: %v, reclaimedAvailableCPUs: %v", availableCPUs.String(), reclaimedAvailableCPUs.String())

poolsCPUSet, isolatedCPUSet, err := p.generatePoolsAndIsolation(poolsQuantityMap, isolatedQuantityMap, availableCPUs, reclaimedAvailableCPUs)
if err != nil {
return fmt.Errorf("generatePoolsAndIsolation failed with error: %v", err)
}
Expand Down Expand Up @@ -642,7 +647,7 @@ func (p *DynamicPolicy) reclaimOverlapNUMABinding(poolsCPUSet map[string]machine
}

for _, allocationInfo := range containerEntries {
if !(allocationInfo != nil && state.CheckDedicatedNUMABinding(allocationInfo) && allocationInfo.CheckMainContainer()) {
if !(allocationInfo != nil && state.CheckDedicatedNUMABindingWithNUMAExclusive(allocationInfo) && allocationInfo.CheckMainContainer()) {
continue
} else if allocationInfo.RampUp {
general.Infof("dedicated numa_binding pod: %s/%s container: %s is in ramp up, not to overlap reclaim pool with it",
Expand Down Expand Up @@ -862,7 +867,7 @@ func (p *DynamicPolicy) applyPoolsAndIsolatedInfo(poolsCPUSet map[string]machine
// 2. use the left cores to allocate among different pools
// 3. apportion to other pools if reclaimed is disabled
func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]int,
isolatedQuantityMap map[string]map[string]int, availableCPUs machine.CPUSet) (poolsCPUSet map[string]machine.CPUSet,
isolatedQuantityMap map[string]map[string]int, availableCPUs machine.CPUSet, reclaimAvailableCPUS machine.CPUSet) (poolsCPUSet map[string]machine.CPUSet,
isolatedCPUSet map[string]map[string]machine.CPUSet, err error,
) {
// clear pool map with zero quantity
Expand Down Expand Up @@ -995,6 +1000,9 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]in
}

poolsCPUSet[state.PoolNameReclaim] = poolsCPUSet[state.PoolNameReclaim].Union(availableCPUs)
// join reclaim pool with all unused cpu on dedicated without exclusive NUMA
// final cpuSet include reservedResourceForAllocated will be calculated by advisor soon if cpu advisor is enabled
poolsCPUSet[state.PoolNameReclaim] = poolsCPUSet[state.PoolNameReclaim].Union(reclaimAvailableCPUS)
if poolsCPUSet[state.PoolNameReclaim].IsEmpty() {
// for reclaimed pool, we must make them exist when the node isn't in hybrid mode even if cause overlap
allAvailableCPUs := p.machineInfo.CPUDetails.CPUs().Difference(p.reservedCPUs)
Expand Down
Loading
Loading