Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Nov 13, 2024
1 parent b764745 commit 0cd4a4a
Show file tree
Hide file tree
Showing 19 changed files with 321 additions and 186 deletions.
6 changes: 2 additions & 4 deletions cmd/kbagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ import (
)

const (
defaultHTTPPort = 3501
defaultStreamingPort = 3502
defaultMaxConcurrency = 8
)

Expand All @@ -54,8 +52,8 @@ func init() {
pflag.BoolVar(&serverConfig.Server, "server", true, "Run as a server.")
pflag.StringVar(&serverConfig.Address, "address", "0.0.0.0", "The HTTP Server listen address for kb-agent service.")
pflag.StringVar(&serverConfig.UnixDomainSocket, "unix-socket", "", "The path of the Unix Domain Socket for kb-agent service.")
pflag.IntVar(&serverConfig.Port, "port", defaultHTTPPort, "The HTTP Server listen port for kb-agent service.")
pflag.IntVar(&serverConfig.StreamingPort, "streaming-port", defaultStreamingPort, "The listen port used by kb-agent to stream data.")
pflag.IntVar(&serverConfig.Port, "port", kbagent.DefaultHTTPPort, "The HTTP Server listen port for kb-agent service.")
pflag.IntVar(&serverConfig.StreamingPort, "streaming-port", kbagent.DefaultStreamingPort, "The listen port used by kb-agent to stream data.")
pflag.IntVar(&serverConfig.Concurrency, "max-concurrency", defaultMaxConcurrency,
fmt.Sprintf("The maximum number of concurrent connections the Server may serve, use the default value %d if <=0.", defaultMaxConcurrency))
pflag.BoolVar(&serverConfig.Logging, "api-logging", true, "Enable api logging for kb-agent request.")
Expand Down
41 changes: 18 additions & 23 deletions controllers/apps/transformer_component_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor
hasFailedPod, messages := t.hasFailedPod()

// check if the component scale out failed
isScaleOutFailed, err := t.isScaleOutFailed(transCtx)
hasRunningScaleOut, hasFailedScaleOut, err := t.hasScaleOutRunning(transCtx)
if err != nil {
return err
}
Expand All @@ -160,7 +160,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor

// calculate if the component has failure
hasFailure := func() bool {
return hasFailedPod || isScaleOutFailed || hasFailedVolumeExpansion
return hasFailedPod || hasFailedScaleOut || hasFailedVolumeExpansion
}()

// check if the component is in creating phase
Expand All @@ -171,7 +171,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor

transCtx.Logger.Info(
fmt.Sprintf("status conditions, creating: %v, its running: %v, has failure: %v, updating: %v, config synced: %v",
isInCreatingPhase, isITSUpdatedNRunning, hasFailure, hasRunningVolumeExpansion, isAllConfigSynced))
isInCreatingPhase, isITSUpdatedNRunning, hasFailure, hasRunningScaleOut || hasRunningVolumeExpansion, isAllConfigSynced))

switch {
case isDeleting:
Expand All @@ -180,7 +180,7 @@ func (t *componentStatusTransformer) reconcileStatus(transCtx *componentTransfor
t.setComponentStatusPhase(transCtx, appsv1.StoppingComponentPhase, nil, "component is Stopping")
case stopped:
t.setComponentStatusPhase(transCtx, appsv1.StoppedComponentPhase, nil, "component is Stopped")
case isITSUpdatedNRunning && isAllConfigSynced && !hasRunningVolumeExpansion:
case isITSUpdatedNRunning && isAllConfigSynced && !hasRunningScaleOut && !hasRunningVolumeExpansion:
t.setComponentStatusPhase(transCtx, appsv1.RunningComponentPhase, nil, "component is Running")
case !hasFailure && isInCreatingPhase:
t.setComponentStatusPhase(transCtx, appsv1.CreatingComponentPhase, nil, "component is Creating")
Expand Down Expand Up @@ -252,29 +252,27 @@ func (t *componentStatusTransformer) isAllConfigSynced(transCtx *componentTransf
return true, nil
}

// isScaleOutFailed checks if the component scale out failed.
func (t *componentStatusTransformer) isScaleOutFailed(transCtx *componentTransformContext) (bool, error) {
if t.runningITS == nil {
return false, nil
// hasScaleOutRunning checks if the scale out is running.
func (t *componentStatusTransformer) hasScaleOutRunning(transCtx *componentTransformContext) (running bool, failed bool, err error) {
if t.runningITS == nil || t.runningITS.Spec.Replicas == nil {
return false, false, nil
}
if t.runningITS.Spec.Replicas == nil {
return false, nil

running, err = component.HasReplicasInCreating(transCtx.Component)
if err != nil {
return false, false, err
}
if t.synthesizeComp.Replicas <= *t.runningITS.Spec.Replicas {
return false, nil
if !running {
return false, false, nil
}

// TODO: impl
// TODO: scale-out failed

return false, nil
return true, false, nil
}

// hasVolumeExpansionRunning checks if the volume expansion is running.
func (t *componentStatusTransformer) hasVolumeExpansionRunning(transCtx *componentTransformContext) (bool, bool, error) {
var (
running bool
failed bool
)
func (t *componentStatusTransformer) hasVolumeExpansionRunning(transCtx *componentTransformContext) (running bool, failed bool, err error) {
for _, vct := range t.runningITS.Spec.VolumeClaimTemplates {
volumes, err := getRunningVolumes(transCtx.Context, t.Client, t.synthesizeComp, t.runningITS, vct.Name)
if err != nil {
Expand Down Expand Up @@ -405,8 +403,5 @@ func (t *componentStatusTransformer) reconcileAvailableCondition(transCtx *compo
}

func (t *componentStatusTransformer) reconcileReplicasStatus(transCtx *componentTransformContext) error {
var (
// comp = transCtx.Component
)
return nil
return component.StatusReplicas(transCtx.Context, t.Client, t.synthesizeComp, t.comp)
}
26 changes: 17 additions & 9 deletions controllers/apps/transformer_component_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *componentVarsTransformer) Transform(ctx graph.TransformContext, dag *gr
envVars2, envData := buildEnvVarsNData(envVars)
setTemplateNEnvVars(synthesizedComp, templateVars, envVars2)

if err := createOrUpdateEnvConfigMap(ctx, dag, envData, nil); err != nil {
if err := createOrUpdateEnvConfigMap(ctx, dag, nil, nil, envData); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -113,6 +113,7 @@ func envConfigMapSource(clusterName, compName string) corev1.EnvFromSource {
}
}

// TODO: remove the deleted env vars from the ConfigMap
func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
data map[string]string, parent *model.ObjectVertex, patches ...map[string]string) error {
var (
Expand All @@ -125,23 +126,23 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
graphCli, _ = transCtx.Client.(model.GraphClient)
)

envObj, err := func() (*corev1.ConfigMap, error) {
envObj, envObjVertex, err := func() (*corev1.ConfigMap, graph.Vertex, error) {
// look up in graph first
if v := graphCli.FindMatchedVertex(dag, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: envKey.Namespace,
Name: envKey.Name,
},
}); v != nil {
return v.(*model.ObjectVertex).Obj.(*corev1.ConfigMap), nil
return v.(*model.ObjectVertex).Obj.(*corev1.ConfigMap), v, nil
}

obj := &corev1.ConfigMap{}
err := transCtx.Client.Get(transCtx.Context, envKey, obj, inDataContext4C())
if err != nil {
return nil, client.IgnoreNotFound(err)
return nil, nil, client.IgnoreNotFound(err)
}
return obj, nil
return obj, nil, nil
}()
if err != nil {
return err
Expand All @@ -161,7 +162,7 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
return mergeWith(data)
}
if envObj != nil {
return mergeWith(envObj.Data)
return mergeWith(maps.Clone(envObj.Data))
}
return mergeWith(nil)
}()
Expand All @@ -181,9 +182,16 @@ func createOrUpdateEnvConfigMap(ctx graph.TransformContext, dag *graph.DAG,
}

if !reflect.DeepEqual(envObj.Data, newData) {
envObjCopy := envObj.DeepCopy()
envObjCopy.Data = newData
graphCli.Do(dag, envObj, envObjCopy, model.ActionUpdatePtr(), parent, inDataContext4G())
if envObjVertex != nil {
envObj.Data = newData // in-place update
if parent != nil {
dag.Connect(parent, envObjVertex)
}
} else {
envObjCopy := envObj.DeepCopy()
envObjCopy.Data = newData
graphCli.Do(dag, envObj, envObjCopy, model.ActionUpdatePtr(), parent, inDataContext4G())
}
return nil
}
return nil
Expand Down
43 changes: 14 additions & 29 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,25 +615,19 @@ func (r *componentWorkloadOps) scaleOut() error {

newReplicas := r.desiredCompPodNameSet.Difference(r.runningItsPodNameSet).UnsortedList()
if err := func() error {
// TODO: select the source replicas to dump data
source, err := r.sourceReplica()
source, err := r.sourceReplica(dataDump)
if err != nil {
return err
}

task, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, newReplicas)
parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, newReplicas)
if err != nil {
return err
}
parameters, err := component.BuildKBAgentTaskEnv(task)
if err != nil {
return err
}

// apply the updated env to the env CM
transCtx := &componentTransformContext{
Context: r.reqCtx.Ctx,
Client: r.cli,
Client: model.NewGraphClient(r.cli),
SynthesizeComponent: r.synthesizeComp,
Component: r.component,
}
Expand All @@ -649,30 +643,21 @@ func (r *componentWorkloadOps) scaleOut() error {
return component.NewReplicas(r.component, newReplicas)
}

func (r *componentWorkloadOps) sourceReplica() (*corev1.Pod, error) {
isLeader := func(pod *corev1.Pod) bool {
if pod == nil || len(pod.Labels) == 0 {
return false
}
roleName, ok := pod.Labels[constant.RoleLabelKey]
if !ok {
return false
}

for _, replicaRole := range r.runningITS.Spec.Roles {
if roleName == replicaRole.Name && replicaRole.IsLeader {
return true
}
}
return false
}
func (r *componentWorkloadOps) sourceReplica(dataDump *appsv1.Action) (*corev1.Pod, error) {
pods, err := component.ListOwnedPods(r.reqCtx.Ctx, r.cli, r.cluster.Namespace, r.cluster.Name, r.synthesizeComp.Name)
if err != nil {
return nil, err
}
for _, pod := range pods {
if isLeader(pod) {
return pod, nil
if len(pods) > 0 {
if len(dataDump.Exec.TargetPodSelector) == 0 {
dataDump.Exec.TargetPodSelector = appsv1.AnyReplica
}
pods, err = lifecycle.SelectTargetPods(pods, nil, dataDump)
if err != nil {
return nil, err
}
if len(pods) > 0 {
return pods[0], nil
}
}
return nil, fmt.Errorf("no available pod to dump data")
Expand Down
6 changes: 5 additions & 1 deletion pkg/constant/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func GenerateDefaultComponentHeadlessServiceName(clusterName, compName string) s

// GenerateClusterComponentEnvPattern generates cluster and component pattern
func GenerateClusterComponentEnvPattern(clusterName, compName string) string {
return fmt.Sprintf("%s-%s-env", clusterName, compName)
return GetCompEnvCMName(fmt.Sprintf("%s-%s", clusterName, compName))
}

func GetCompEnvCMName(compObjName string) string {
return fmt.Sprintf("%s-env", compObjName)
}

// GenerateDefaultServiceAccountName generates default service account name for a cluster.
Expand Down
46 changes: 27 additions & 19 deletions pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ const (
kbAgentSharedMountPath = "/kubeblocks"
kbAgentCommandOnSharedMount = "/kubeblocks/kbagent"

minAvailablePort = 1025
maxAvailablePort = 65535
kbAgentDefaultHTTPPort = 3501
kbAgentDefaultStreamingPort = 3502
minAvailablePort = 1025
maxAvailablePort = 65535

defaultProbeReportPeriodSeconds = 60
minProbeReportPeriodSeconds = 15
Expand All @@ -56,7 +54,7 @@ var (
)

func IsKBAgentContainer(c *corev1.Container) bool {
return c.Name == kbagent.ContainerName || c.Name == kbagent.InitContainerName || c.Name == kbagent.InitContainerName4Worker
return c.Name == kbagent.ContainerName || c.Name == kbagent.ContainerName4Worker || c.Name == kbagent.InitContainerName
}

func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
Expand Down Expand Up @@ -97,17 +95,27 @@ func UpdateKBAgentContainer4HostNetwork(synthesizedComp *SynthesizedComponent) {
synthesizedComp.PodSpec.Containers[idx] = *c
}

func BuildKBAgentTaskEnv(task proto.Task) (map[string]string, error) {
envVars, err := kbagent.BuildEnv4Worker([]proto.Task{task})
func buildKBAgentTaskEnv(task proto.Task) (map[string]string, error) {
envVar, err := kbagent.BuildEnv4Worker([]proto.Task{task})
if err != nil {
return nil, err
}
return map[string]string{
envVar.Name: envVar.Value,
}, nil
}

m := make(map[string]string)
for _, v := range envVars {
m[v.Name] = v.Value
func updateKBAgentTaskEnv(envVars map[string]string, f func(proto.Task) *proto.Task) (map[string]string, error) {
envVar, err := kbagent.UpdateEnv4Worker(envVars, f)
if err != nil {
return nil, err
}
return m, nil
if envVar == nil {
return nil, nil
}
return map[string]string{
envVar.Name: envVar.Value,
}, nil
}

func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
Expand Down Expand Up @@ -140,7 +148,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {

container, err := newContainer(kbagent.ContainerName, func(b *builder.ContainerBuilder) error {
ports, err1 := getAvailablePorts(synthesizedComp.PodSpec.Containers,
[]int32{int32(kbAgentDefaultHTTPPort), int32(kbAgentDefaultStreamingPort)})
[]int32{int32(kbagent.DefaultHTTPPort), int32(kbagent.DefaultStreamingPort)})
if err1 != nil {
return err1
}
Expand All @@ -151,32 +159,32 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
corev1.ContainerPort{
ContainerPort: int32(httpPort),
Name: kbagent.DefaultHTTPPortName,
Protocol: "TCP",
Protocol: corev1.ProtocolTCP,
},
corev1.ContainerPort{
ContainerPort: int32(streamingPort),
Name: kbagent.DefaultStreamingPortName,
Protocol: "TCP",
Protocol: corev1.ProtocolTCP,
}).
SetStartupProbe(corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(httpPort)},
}})
return nil
})
if err == nil {
if err != nil {
return err
}

initContainer, err := newContainer(kbagent.InitContainerName4Worker, func(b *builder.ContainerBuilder) error {
b.AddArgs("--server", "false") // run as a worker
workerContainer, err := newContainer(kbagent.ContainerName4Worker, func(b *builder.ContainerBuilder) error {
b.AddArgs("--server=false") // run as a worker
return nil
})
if err != nil {
return err
}

if err = handleCustomImageNContainerDefined(synthesizedComp, container, initContainer); err != nil {
if err = handleCustomImageNContainerDefined(synthesizedComp, container, workerContainer); err != nil {
return err
}

Expand All @@ -200,7 +208,7 @@ func buildKBAgentContainer(synthesizedComp *SynthesizedComponent) error {
}

synthesizedComp.PodSpec.Containers = append(synthesizedComp.PodSpec.Containers, *container)
synthesizedComp.PodSpec.InitContainers = append(synthesizedComp.PodSpec.InitContainers, *initContainer)
synthesizedComp.PodSpec.InitContainers = append(synthesizedComp.PodSpec.InitContainers, *workerContainer)

return nil
}
Expand Down
Loading

0 comments on commit 0cd4a4a

Please sign in to comment.