Skip to content

Commit

Permalink
feat(qrm): support nic allocation reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Jan 17, 2025
1 parent 945a96d commit 71ecc19
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 3 deletions.
5 changes: 5 additions & 0 deletions cmd/katalyst-agent/app/options/qrm/network_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type NetworkOptions struct {
NetInterfaceNameResourceAllocationAnnotationKey string
NetClassIDResourceAllocationAnnotationKey string
NetBandwidthResourceAllocationAnnotationKey string
EnableNICAllocationReactor bool
}

type NetClassOptions struct {
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewNetworkOptions() *NetworkOptions {
NetInterfaceNameResourceAllocationAnnotationKey: "qrm.katalyst.kubewharf.io/nic_name",
NetClassIDResourceAllocationAnnotationKey: "qrm.katalyst.kubewharf.io/netcls_id",
NetBandwidthResourceAllocationAnnotationKey: "qrm.katalyst.kubewharf.io/net_bandwidth",
EnableNICAllocationReactor: true,
}
}

Expand Down Expand Up @@ -106,6 +108,8 @@ func (o *NetworkOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.NetClassIDResourceAllocationAnnotationKey, "The annotation key of allocated netcls id for the container, which is ready by runtime")
fs.StringVar(&o.NetBandwidthResourceAllocationAnnotationKey, "network-resource-plugin-bandwidth-allocation-anno-key",
o.NetBandwidthResourceAllocationAnnotationKey, "The annotation key of allocated bandwidth for the container, which is ready by runtime")
fs.BoolVar(&o.EnableNICAllocationReactor, "enable-network-resource-plugin-nic-allocation-reactor",
o.EnableNICAllocationReactor, "enable network allocation reactor")
}

func (o *NetworkOptions) ApplyTo(conf *qrmconfig.NetworkQRMPluginConfig) error {
Expand All @@ -126,6 +130,7 @@ func (o *NetworkOptions) ApplyTo(conf *qrmconfig.NetworkQRMPluginConfig) error {
conf.NetInterfaceNameResourceAllocationAnnotationKey = o.NetInterfaceNameResourceAllocationAnnotationKey
conf.NetClassIDResourceAllocationAnnotationKey = o.NetClassIDResourceAllocationAnnotationKey
conf.NetBandwidthResourceAllocationAnnotationKey = o.NetBandwidthResourceAllocationAnnotationKey
conf.EnableNICAllocationReactor = o.EnableNICAllocationReactor

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/luomingmeng/katalyst-api v0.0.0-20250116062932-2fa73ecf8625
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.2-0.20241210135216-5785b7552c05 h1:oV/CsCzr3T3WnWz91aZebSpbKiYFQSS564CgsCrD/QQ=
github.com/kubewharf/katalyst-api v0.5.2-0.20241210135216-5785b7552c05/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9 h1:jOTYZt7h/J7I8xQMKMUcJjKf5UFBv37jHWvNp5VRFGc=
github.com/kubewharf/kubelet v1.24.6-kubewharf.9/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand All @@ -583,6 +581,8 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lpabon/godbc v0.1.1/go.mod h1:Jo9QV0cf3U6jZABgiJ2skINAXb9j8m51r07g4KI92ZA=
github.com/luomingmeng/katalyst-api v0.0.0-20250116062932-2fa73ecf8625 h1:fdBSMRq7AeD1jVYa83dlh+NKmNLgCGT2uK6LosdnZE8=
github.com/luomingmeng/katalyst-api v0.0.0-20250116062932-2fa73ecf8625/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
22 changes: 21 additions & 1 deletion pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
networkreactor "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/staticpolicy/reactor"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
"github.com/kubewharf/katalyst-core/pkg/config"
agentconfig "github.com/kubewharf/katalyst-core/pkg/config/agent"
"github.com/kubewharf/katalyst-core/pkg/config/agent/qrm"
Expand Down Expand Up @@ -95,6 +97,8 @@ type StaticPolicy struct {
podAnnotationKeptKeys []string
podLabelKeptKeys []string

nicAllocationReactor reactor.AllocationReactor

// aliveCgroupID is used to record the alive cgroupIDs and their last alive time
aliveCgroupID map[uint64]time.Time
}
Expand Down Expand Up @@ -160,6 +164,15 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,

policyImplement.ApplyConfig(conf.StaticAgentConfiguration)

policyImplement.nicAllocationReactor = reactor.DummyAllocationReactor{}
if conf.EnableNICAllocationReactor {
policyImplement.nicAllocationReactor = networkreactor.NewNICPodAllocationReactor(
reactor.NewPodAllocationReactor(
agentCtx.MetaServer.PodFetcher,
agentCtx.Client.KubeClient,
))
}

pluginWrapper, err := skeleton.NewRegistrationPluginWrapper(policyImplement, conf.QRMPluginSocketDirs,
func(key string, value int64) {
_ = wrappedEmitter.StoreInt64(key, value, metrics.MetricTypeNameRaw)
Expand Down Expand Up @@ -496,7 +509,7 @@ func (p *StaticPolicy) GetResourcePluginOptions(context.Context,
// Allocate is called during pod admit so that the resource
// plugin can allocate corresponding resource for the container
// according to resource request
func (p *StaticPolicy) Allocate(_ context.Context,
func (p *StaticPolicy) Allocate(ctx context.Context,
req *pluginapi.ResourceRequest,
) (resp *pluginapi.ResourceAllocationResponse, err error) {
if req == nil {
Expand Down Expand Up @@ -674,6 +687,13 @@ func (p *StaticPolicy) Allocate(_ context.Context,
// update state cache
p.state.SetMachineState(machineState)

// update nic allocation
err = p.nicAllocationReactor.UpdateAllocation(ctx, newAllocation)
if err != nil {
general.Errorf("nicAllocationReactor UpdateAllocation failed with error: %v", err)
return nil, err
}

return packAllocationResponse(req, newAllocation, resourceAllocationAnnotations)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
metaserveragent "github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
Expand Down Expand Up @@ -187,6 +188,7 @@ func makeStaticPolicy(t *testing.T, hasNic bool) *StaticPolicy {
netInterfaceNameResourceAllocationAnnotationKey: testNetInterfaceNameResourceAllocationAnnotationKey,
netClassIDResourceAllocationAnnotationKey: testNetClassIDResourceAllocationAnnotationKey,
netBandwidthResourceAllocationAnnotationKey: testNetBandwidthResourceAllocationAnnotationKey,
nicAllocationReactor: reactor.DummyAllocationReactor{},
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reactor

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
)

type nicPodAllocationWrapper struct {
*state.AllocationInfo
}

func (p nicPodAllocationWrapper) UpdateAllocation(pod *v1.Pod) error {
if p.AllocationInfo == nil {
return nil
}

annotations := pod.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}

annotations[apiconsts.PodAnnotationNICSelectionResultKey] = p.AllocationInfo.IfName
pod.SetAnnotations(annotations)

return nil
}

func (p nicPodAllocationWrapper) NeedUpdateAllocation(pod *v1.Pod) bool {
if p.CheckSideCar() {
return false
}

if _, ok := pod.Annotations[apiconsts.PodAnnotationNICSelectionResultKey]; !ok {
return true
}

return false
}

type nicPodAllocationReactor struct {
reactor.AllocationReactor
}

func NewNICPodAllocationReactor(r reactor.AllocationReactor) reactor.AllocationReactor {
return &nicPodAllocationReactor{
AllocationReactor: r,
}
}

func (r *nicPodAllocationReactor) UpdateAllocation(ctx context.Context, allocation commonstate.Allocation) error {
if allocation == nil {
return fmt.Errorf("allocation info is nil")
}

allocationInfo, ok := allocation.(*state.AllocationInfo)
if !ok {
return fmt.Errorf("allocation info is not of type network.AllocationInfo")
}

return r.AllocationReactor.UpdateAllocation(ctx, nicPodAllocationWrapper{
AllocationInfo: allocationInfo,
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reactor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/commonstate"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util/reactor"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
)

func Test_podNICAllocationReactor_UpdateAllocation(t *testing.T) {
t.Parallel()

type fields struct {
podFetcher pod.PodFetcher
client kubernetes.Interface
}
type args struct {
allocation *state.AllocationInfo
}
tests := []struct {
name string
fields fields
args args
wantPod *v1.Pod
wantErr bool
}{
{
name: "actual_nic_selection_pod",
fields: fields{
podFetcher: &pod.PodFetcherStub{
PodList: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
Namespace: "test",
UID: "test-1-uid",
},
},
},
},
client: fake.NewSimpleClientset(
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
Namespace: "test",
UID: "test-1-uid",
},
},
),
},
args: args{
allocation: &state.AllocationInfo{
AllocationMeta: commonstate.AllocationMeta{
PodUid: "test-1-uid",
PodNamespace: "test",
PodName: "test-1",
ContainerName: "container-1",
ContainerType: pluginapi.ContainerType_MAIN.String(),
ContainerIndex: 0,
QoSLevel: consts.PodAnnotationQoSLevelSharedCores,
Annotations: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores,
consts.PodAnnotationMemoryEnhancementNumaBinding: consts.PodAnnotationMemoryEnhancementNumaBindingEnable,
cpuconsts.CPUStateAnnotationKeyNUMAHint: "0",
},
Labels: map[string]string{
consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores,
},
},
IfName: "eth0",
},
},
wantPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
Namespace: "test",
UID: types.UID("test-1-uid"),
Annotations: map[string]string{
consts.PodAnnotationNICSelectionResultKey: "eth0",
},
},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
r := NewNICPodAllocationReactor(reactor.NewPodAllocationReactor(tt.fields.podFetcher, tt.fields.client))
if err := r.UpdateAllocation(context.Background(), tt.args.allocation); (err != nil) != tt.wantErr {
t.Errorf("UpdateAllocation() error = %v, wantErr %v", err, tt.wantErr)
}

getPod, err := tt.fields.client.CoreV1().Pods(tt.args.allocation.PodNamespace).Get(context.Background(), tt.args.allocation.PodName, metav1.GetOptions{})
if err != nil {
t.Errorf("GetPod() error = %v, wantErr %v", err, tt.wantErr)
} else {
assert.Equal(t, tt.wantPod, getPod)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reactor
import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/agent/qrm/network_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type NetworkQRMPluginConfig struct {
NetInterfaceNameResourceAllocationAnnotationKey string
NetClassIDResourceAllocationAnnotationKey string
NetBandwidthResourceAllocationAnnotationKey string

// EnableNICAllocationReactor: enable the nic allocation reactor for pods already have nic allocated by runtime
EnableNICAllocationReactor bool
}

type NetClassConfig struct {
Expand Down

0 comments on commit 71ecc19

Please sign in to comment.