Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Nov 12, 2024
1 parent b764745 commit 74b814d
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 106 deletions.
41 changes: 18 additions & 23 deletions controllers/apps/transformer_component_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor
hasFailedPod, messages := t.hasFailedPod()

// check if the component scale out failed
isScaleOutFailed, err := t.isScaleOutFailed(transCtx)
hasRunningScaleOut, hasFailedScaleOut, err := t.hasScaleOutRunning(transCtx)
if err != nil {
return err
}
Expand All @@ -160,7 +160,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor

// calculate if the component has failure
hasFailure := func() bool {
return hasFailedPod || isScaleOutFailed || hasFailedVolumeExpansion
return hasFailedPod || hasFailedScaleOut || hasFailedVolumeExpansion
}()

// check if the component is in creating phase
Expand All @@ -171,7 +171,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor

transCtx.Logger.Info(
fmt.Sprintf("status conditions, creating: %v, its running: %v, has failure: %v, updating: %v, config synced: %v",
isInCreatingPhase, isITSUpdatedNRunning, hasFailure, hasRunningVolumeExpansion, isAllConfigSynced))
isInCreatingPhase, isITSUpdatedNRunning, hasFailure, hasRunningScaleOut || hasRunningVolumeExpansion, isAllConfigSynced))

switch {
case isDeleting:
Expand All @@ -180,7 +180,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor
t.setComponentStatusPhase(transCtx, appsv1.StoppingComponentPhase, nil, "component is Stopping")
case stopped:
t.setComponentStatusPhase(transCtx, appsv1.StoppedComponentPhase, nil, "component is Stopped")
case isITSUpdatedNRunning && isAllConfigSynced && !hasRunningVolumeExpansion:
case isITSUpdatedNRunning && isAllConfigSynced && !hasRunningScaleOut && !hasRunningVolumeExpansion:
t.setComponentStatusPhase(transCtx, appsv1.RunningComponentPhase, nil, "component is Running")
case !hasFailure && isInCreatingPhase:
t.setComponentStatusPhase(transCtx, appsv1.CreatingComponentPhase, nil, "component is Creating")
Expand Down Expand Up @@ -252,29 +252,27 @@ func (t *componentStatusTransformer) isAllConfigSynced(transCtx *componentTransf
return true, nil
}

// isScaleOutFailed checks if the component scale out failed.
func (t *componentStatusTransformer) isScaleOutFailed(transCtx *componentTransformContext) (bool, error) {
if t.runningITS == nil {
return false, nil
// hasScaleOutRunning checks if the scale out is running.
func (t *componentStatusTransformer) hasScaleOutRunning(transCtx *componentTransformContext) (running bool, failed bool, err error) {
if t.runningITS == nil || t.runningITS.Spec.Replicas == nil {
return false, false, nil
}
if t.runningITS.Spec.Replicas == nil {
return false, nil

running, err = component.HasReplicasInCreating(transCtx.Component)
if err != nil {
return false, false, err
}
if t.synthesizeComp.Replicas <= *t.runningITS.Spec.Replicas {
return false, nil
if !running {
return false, false, nil
}

// TODO: impl
// TODO: scale-out failed

return false, nil
return true, false, nil
}

// hasVolumeExpansionRunning checks if the volume expansion is running.
func (t *componentStatusTransformer) hasVolumeExpansionRunning(transCtx *componentTransformContext) (bool, bool, error) {
var (
running bool
failed bool
)
func (t *componentStatusTransformer) hasVolumeExpansionRunning(transCtx *componentTransformContext) (running bool, failed bool, err error) {
for _, vct := range t.runningITS.Spec.VolumeClaimTemplates {
volumes, err := getRunningVolumes(transCtx.Context, t.Client, t.synthesizeComp, t.runningITS, vct.Name)
if err != nil {
Expand Down Expand Up @@ -405,8 +403,5 @@ func (t *componentStatusTransformer) reconcileAvailableCondition(transCtx *compo
}

func (t *componentStatusTransformer) reconcileReplicasStatus(transCtx *componentTransformContext) error {
var (
// comp = transCtx.Component
)
return nil
return component.StatusReplicas(transCtx.Context, t.Client, t.synthesizeComp, t.comp)
}
25 changes: 16 additions & 9 deletions controllers/apps/transformer_component_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *componentVarsTransformer) Transform(ctx graph.TransformContext, dag *gr
envVars2, envData := buildEnvVarsNData(envVars)
setTemplateNEnvVars(synthesizedComp, templateVars, envVars2)

if err := createOrUpdateEnvConfigMap(ctx, dag, envData, nil); err != nil {
if err := createOrUpdateEnvConfigMap(ctx, dag, nil, nil, envData); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -125,23 +125,23 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
graphCli, _ = transCtx.Client.(model.GraphClient)
)

envObj, err := func() (*corev1.ConfigMap, error) {
envObj, envObjVertex, err := func() (*corev1.ConfigMap, graph.Vertex, error) {
// look up in graph first
if v := graphCli.FindMatchedVertex(dag, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: envKey.Namespace,
Name: envKey.Name,
},
}); v != nil {
return v.(*model.ObjectVertex).Obj.(*corev1.ConfigMap), nil
return v.(*model.ObjectVertex).Obj.(*corev1.ConfigMap), v, nil
}

obj := &corev1.ConfigMap{}
err := transCtx.Client.Get(transCtx.Context, envKey, obj, inDataContext4C())
if err != nil {
return nil, client.IgnoreNotFound(err)
return nil, nil, client.IgnoreNotFound(err)
}
return obj, nil
return obj, nil, nil
}()
if err != nil {
return err
Expand All @@ -161,7 +161,7 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
return mergeWith(data)
}
if envObj != nil {
return mergeWith(envObj.Data)
return mergeWith(maps.Clone(envObj.Data))
}
return mergeWith(nil)
}()
Expand All @@ -181,9 +181,16 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
}

if !reflect.DeepEqual(envObj.Data, newData) {
envObjCopy := envObj.DeepCopy()
envObjCopy.Data = newData
graphCli.Do(dag, envObj, envObjCopy, model.ActionUpdatePtr(), parent, inDataContext4G())
if envObjVertex != nil {
envObj.Data = newData // in-place update
if parent != nil {
dag.Connect(parent, envObjVertex)
}
} else {
envObjCopy := envObj.DeepCopy()
envObjCopy.Data = newData
graphCli.Do(dag, envObj, envObjCopy, model.ActionUpdatePtr(), parent, inDataContext4G())
}
return nil
}
return nil
Expand Down
9 changes: 2 additions & 7 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,19 +621,14 @@ func (r *componentWorkloadOps) scaleOut() error {
return err
}

task, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, newReplicas)
parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, newReplicas)
if err != nil {
return err
}
parameters, err := component.BuildKBAgentTaskEnv(task)
if err != nil {
return err
}

// apply the updated env to the env CM
transCtx := &componentTransformContext{
Context: r.reqCtx.Ctx,
Client: r.cli,
Client: model.NewGraphClient(r.cli),
SynthesizeComponent: r.synthesizeComp,
Component: r.component,
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/constant/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func GenerateDefaultComponentHeadlessServiceName(clusterName, compName string) s

// GenerateClusterComponentEnvPattern generates cluster and component pattern
func GenerateClusterComponentEnvPattern(clusterName, compName string) string {
return fmt.Sprintf("%s-%s-env", clusterName, compName)
return GetCompEnvCMName(fmt.Sprintf("%s-%s", clusterName, compName))
}

func GetCompEnvCMName(compObjName string) string {
return fmt.Sprintf("%s-env", compObjName)
}

// GenerateDefaultServiceAccountName generates default service account name for a cluster.
Expand Down
30 changes: 20 additions & 10 deletions pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,27 @@ func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
synthesizedComp.PodSpec.Containers[idx] = *c
}

func BuildKBAgentTaskEnv(task proto.Task) (map[string]string, error) {
envVars, err := kbagent.BuildEnv4Worker([]proto.Task{task})
func buildKBAgentTaskEnv(task proto.Task) (map[string]string, error) {
envVar, err := kbagent.BuildEnv4Worker([]proto.Task{task})
if err != nil {
return nil, err
}
return map[string]string{
envVar.Name: envVar.Value,
}, nil
}

m := make(map[string]string)
for _, v := range envVars {
m[v.Name] = v.Value
func updateKBAgentTaskEnv(envVars map[string]string, f func(proto.Task) *proto.Task) (map[string]string, error) {
envVar, err := kbagent.UpdateEnv4Worker(envVars, f)
if err != nil {
return nil, err
}
return m, nil
if envVar == nil {
return nil, nil
}
return map[string]string{
envVar.Name: envVar.Value,
}, nil
}

func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
Expand Down Expand Up @@ -151,25 +161,25 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
corev1.ContainerPort{
ContainerPort: int32(httpPort),
Name: kbagent.DefaultHTTPPortName,
Protocol: "TCP",
Protocol: corev1.ProtocolTCP,
},
corev1.ContainerPort{
ContainerPort: int32(streamingPort),
Name: kbagent.DefaultStreamingPortName,
Protocol: "TCP",
Protocol: corev1.ProtocolTCP,
}).
SetStartupProbe(corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(httpPort)},
}})
return nil
})
if err == nil {
if err != nil {
return err
}

initContainer, err := newContainer(kbagent.InitContainerName4Worker, func(b *builder.ContainerBuilder) error {
b.AddArgs("--server", "false") // run as a worker
b.AddArgs("--server=false") // run as a worker
return nil
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/component/kbagent_task_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *KBAgentTaskEventHandler) Handle(cli client.Client, reqCtx intctrlutil.R
}
compCopy := comp.DeepCopy()

err := h.handleEvent(*taskEvent, comp)
err := h.handleEvent(reqCtx.Ctx, cli, *taskEvent, comp)
if err != nil {
return err
}
Expand All @@ -68,9 +68,9 @@ func (h *KBAgentTaskEventHandler) isTaskEvent(event *corev1.Event) bool {
event.Reason == "task" && event.InvolvedObject.FieldPath == proto.ProbeEventFieldPath
}

func (h *KBAgentTaskEventHandler) handleEvent(event proto.TaskEvent, comp *appsv1.Component) error {
func (h *KBAgentTaskEventHandler) handleEvent(ctx context.Context, cli client.Client, event proto.TaskEvent, comp *appsv1.Component) error {
if event.Task == newReplicaTask {
return handleNewReplicaTaskEvent(comp, event)
return handleNewReplicaTaskEvent(ctx, cli, comp, event)
}
return fmt.Errorf("unsupported kind of task event: %s", event.Task)
}
Expand Down
Loading

0 comments on commit 74b814d

Please sign in to comment.