Skip to content

Commit

Permalink
add some important events for ephemeral job (openkruise#1454)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>
Co-authored-by: mingzhou.swx <[email protected]>
  • Loading branch information
veophi and mingzhou.swx authored Nov 14, 2023
1 parent 4e80be5 commit 01717ff
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 30 deletions.
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- apps.kruise.io
resources:
- ephemeraljobs/finalizers
verbs:
- get
- patch
- update
- apiGroups:
- apps.kruise.io
resources:
Expand Down
11 changes: 9 additions & 2 deletions pkg/controller/ephemeraljob/econtainer/api.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package econtainer

import (
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
v1 "k8s.io/api/core/v1"
)

type EphemeralContainerInterface interface {
// GetEphemeralContainers will return all ephemeral container status.
// GetEphemeralContainersStatus will return all ephemeral container status.
// Maybe they are not created by current ephemeral jobs.
GetEphemeralContainersStatus(target *v1.Pod) []v1.ContainerStatus
// GetEphemeralContainers return all ephemeral containers which have been created in target pods.
// Maybe they are not created by current ephemeral jobs.
GetEphemeralContainers(target *v1.Pod) []v1.EphemeralContainer
// ContainsEphemeralContainer return if target pod contains(1st return value) and owns(2nd return value)
// the ephemeral containers having the same name with the ones ephemeralJob want to inject.
// Owning an ephemeral containers to a ephemeralJob means KRUISE_EJOB_ID env of the ephemeral container
// equals to this ephemeralJob's uid.
ContainsEphemeralContainer(target *v1.Pod) (bool, bool)

UpdateEphemeralContainer(target *v1.Pod) error
CreateEphemeralContainer(target *v1.Pod) error
RemoveEphemeralContainer(target *v1.Pod) error
RemoveEphemeralContainer(target *v1.Pod) (*time.Duration, error)
}

func New(job *appsv1alpha1.EphemeralJob) EphemeralContainerInterface {
Expand Down
24 changes: 22 additions & 2 deletions pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kubeclient "github.com/openkruise/kruise/pkg/client"
Expand Down Expand Up @@ -211,13 +212,32 @@ func (k *k8sControl) createEphemeralContainerLegacy(targetPod *v1.Pod, eContaine
}

// RemoveEphemeralContainer is not support before kubernetes v1.23
func (k *k8sControl) RemoveEphemeralContainer(target *v1.Pod) error {
func (k *k8sControl) RemoveEphemeralContainer(target *v1.Pod) (*time.Duration, error) {
klog.Warning("RemoveEphemeralContainer is not support before kubernetes v1.23")
return nil
return nil, nil
}

// UpdateEphemeralContainer is not support before kubernetes v1.23
func (k *k8sControl) UpdateEphemeralContainer(target *v1.Pod) error {
klog.Warning("UpdateEphemeralContainer is not support before kubernetes v1.23")
return nil
}

func (k *k8sControl) ContainsEphemeralContainer(target *v1.Pod) (exists, owned bool) {
ephemeralContainersMaps, _ := getEphemeralContainersMaps(k.GetEphemeralContainers(target))
for _, e := range k.Spec.Template.EphemeralContainers {
if targetEC, ok := ephemeralContainersMaps[e.Name]; ok {
return true, isCreatedByEJob(string(k.UID), targetEC)
}
}
return false, false
}

func isCreatedByEJob(jobUid string, container v1.EphemeralContainer) bool {
for _, env := range container.Env {
if env.Name == appsv1alpha1.EphemeralContainerEnvKey && env.Value == jobUid {
return true
}
}
return false
}
45 changes: 30 additions & 15 deletions pkg/controller/ephemeraljob/ephemeraljob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -65,8 +66,9 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) *ReconcileEphemeralJob {
return &ReconcileEphemeralJob{
Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"),
scheme: mgr.GetScheme(),
Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("ephemeraljob-controller"),
}
}

Expand Down Expand Up @@ -98,11 +100,13 @@ var _ reconcile.Reconciler = &ReconcileEphemeralJob{}
// ReconcileEphemeralJob reconciles a ImagePullJob object
type ReconcileEphemeralJob struct {
client.Client
scheme *runtime.Scheme
scheme *runtime.Scheme
recorder record.EventRecorder
}

// +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs,verbs=get;list;watch;update;patch;delete
// +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/ephemeralcontainers,verbs=get;update;patch

Expand All @@ -123,7 +127,6 @@ func (r *ReconcileEphemeralJob) Reconcile(context context.Context, request recon

job := &appsv1alpha1.EphemeralJob{}
err = r.Get(context, request.NamespacedName, job)

if err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
Expand All @@ -137,9 +140,14 @@ func (r *ReconcileEphemeralJob) Reconcile(context context.Context, request recon
}

if job.DeletionTimestamp != nil {
if err := r.removeEphemeralContainers(job); err != nil {
retryAfter, err := r.removeEphemeralContainers(job)
if err != nil {
return reconcile.Result{}, err
}
if retryAfter != nil {
return reconcile.Result{RequeueAfter: *retryAfter}, nil
}

job.Finalizers = deleteEphemeralContainerFinalizer(job.Finalizers, EphemeralContainerFinalizer)
return reconcile.Result{}, r.Update(context, job)
}
Expand Down Expand Up @@ -281,14 +289,15 @@ func (r *ReconcileEphemeralJob) filterInjectedPods(job *appsv1alpha1.EphemeralJo
return nil, err
}

control := econtainer.New(job)
// Ignore inactive pods
var targetPods []*v1.Pod
for i := range podList.Items {
pod := &podList.Items[i]
if !kubecontroller.IsPodActive(pod) {
continue
}
if exists, owned := existEphemeralContainer(job, pod); exists {
if exists, owned := control.ContainsEphemeralContainer(pod); exists {
if owned {
targetPods = append(targetPods, pod)
} else {
Expand Down Expand Up @@ -335,7 +344,7 @@ func (r *ReconcileEphemeralJob) syncTargetPods(job *appsv1alpha1.EphemeralJob, t
_, err := clonesetutils.DoItSlowly(len(toCreatePods), kubecontroller.SlowStartInitialBatchSize, func() error {
pod := <-podsCreationChan

if exists, _ := existEphemeralContainer(job, pod); exists {
if exists, _ := control.ContainsEphemeralContainer(pod); exists {
return nil
}

Expand All @@ -348,11 +357,14 @@ func (r *ReconcileEphemeralJob) syncTargetPods(job *appsv1alpha1.EphemeralJob, t
for _, podEphemeralContainerName := range getPodEphemeralContainers(pod, job) {
scaleExpectations.ObserveScale(key, expectations.Create, podEphemeralContainerName)
}
return err
return fmt.Errorf("failed to create ephemeral container in pod %s/%s: %v", pod.Namespace, pod.Name, err)
}

return nil
})
if err != nil {
r.recorder.Eventf(job, v1.EventTypeWarning, "CreateFailed", err.Error())
}

return err
}
Expand Down Expand Up @@ -433,19 +445,22 @@ func (r *ReconcileEphemeralJob) updateJobStatus(job *appsv1alpha1.EphemeralJob)
return r.Status().Update(context.TODO(), job)
}

func (r *ReconcileEphemeralJob) removeEphemeralContainers(job *appsv1alpha1.EphemeralJob) error {
func (r *ReconcileEphemeralJob) removeEphemeralContainers(job *appsv1alpha1.EphemeralJob) (*time.Duration, error) {
targetPods, err := r.filterInjectedPods(job)
if err != nil {
klog.Errorf("Failed to get ephemeral job %s/%s related target pods: %v", job.Namespace, job.Name, err)
return err
return nil, err
}

var errors error
control := econtainer.New(job)
var retryAfter *time.Duration
for _, pod := range targetPods {
if e := econtainer.New(job).RemoveEphemeralContainer(pod); e != nil {
errors = e
if duration, removeErr := control.RemoveEphemeralContainer(pod); removeErr != nil {
err = fmt.Errorf("failed to remove ephemeral containers for pod %s/%s: %s", pod.Namespace, pod.Name, removeErr.Error())
r.recorder.Eventf(job, v1.EventTypeWarning, "RemoveFailed", removeErr.Error())
} else if duration != nil && (retryAfter == nil || *retryAfter > *duration) {
retryAfter = duration
}
}

return errors
return retryAfter, err
}
11 changes: 0 additions & 11 deletions pkg/controller/ephemeraljob/ephemeraljob_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,6 @@ func getPodEphemeralContainers(pod *v1.Pod, ejob *appsv1alpha1.EphemeralJob) []s
return podEphemeralNames
}

func existEphemeralContainer(job *appsv1alpha1.EphemeralJob, targetPod *v1.Pod) (exists, owned bool) {
ephemeralContainersMaps, _ := getEphemeralContainersMaps(econtainer.New(job).GetEphemeralContainers(targetPod))
for _, e := range job.Spec.Template.EphemeralContainers {
if targetEC, ok := ephemeralContainersMaps[e.Name]; ok {
return true, isCreatedByEJob(string(job.UID), targetEC)
}
}

return false, false
}

func existDuplicatedEphemeralContainer(job *appsv1alpha1.EphemeralJob, targetPod *v1.Pod) bool {
ephemeralContainersMaps, _ := getEphemeralContainersMaps(econtainer.New(job).GetEphemeralContainers(targetPod))
for _, e := range job.Spec.Template.EphemeralContainers {
Expand Down

0 comments on commit 01717ff

Please sign in to comment.