Skip to content

Commit

Permalink
cluster claim update workflow in config-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
indresh-28 authored and vramk23 committed Nov 25, 2023
1 parent af3ab74 commit a0ceefa
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 119 deletions.
3 changes: 2 additions & 1 deletion capten/agent/internal/api/plugin_crossplane_project_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (a *Agent) configureCrossplaneGitRepo(req *model.CrossplaneProject, provide
VaultCredIdentifier: req.GitProjectId, CrossplaneProviders: providers}
wd := workers.NewConfig(a.tc, a.log)

wkfId, err := wd.SendAsyncEvent(context.TODO(), &captenmodel.ConfigureParameters{Resource: crossplaneConfigUseCase}, ci)
wkfId, err := wd.SendAsyncEvent(context.TODO(),
&captenmodel.ConfigureParameters{Resource: crossplaneConfigUseCase, Action: model.CrossPlaneProjectSync}, ci)
if err != nil {
req.Status = string(model.CrossplaneProjectConfigurationFailed)
req.WorkflowId = "NA"
Expand Down
134 changes: 74 additions & 60 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/google/uuid"
"github.com/intelops/go-common/logging"
Expand All @@ -21,7 +22,10 @@ import (
)

var (
readyStatusType = "Ready"
readyStatusType = "ready"
nodePoolStatusType = "nodepool"
controlPlaneStatusType = "controlplane"

clusterNotReadyStatus = "NotReady"
clusterReadyStatus = "Ready"
readyStatusValue = "True"
Expand Down Expand Up @@ -163,66 +167,67 @@ func (h *ClusterClaimSyncHandler) updateManagedClusters(clusterCliams []model.Cl

for _, clusterCliam := range clusterCliams {
h.log.Infof("processing cluster claim %s", clusterCliam.Metadata.Name)
for _, status := range clusterCliam.Status.Conditions {
if status.Type != readyStatusType {
continue
}
nodePoolStatus, controlPlaneStatus, readyStatus := getClusterClaimStatus(clusterCliam.Status.Conditions)
h.log.Infof("cluster claim %s status: %s-%s-%s", clusterCliam.Metadata.Name, nodePoolStatus, controlPlaneStatus, readyStatus)

managedCluster := &captenpluginspb.ManagedCluster{}
managedCluster.ClusterName = clusterCliam.Metadata.Name
if !(strings.EqualFold(nodePoolStatus, "active") && strings.EqualFold(controlPlaneStatus, "active")) {
h.log.Infof("cluster %s is not created", clusterCliam.Metadata.Name)
return nil
}

clusterObj, ok := clusters[managedCluster.ClusterName]
if !ok {
managedCluster.Id = uuid.New().String()
} else {
h.log.Infof("found existing managed clusterId %s, updating", clusterObj.Id)
managedCluster.Id = clusterObj.Id
managedCluster.ClusterDeployStatus = clusterObj.ClusterDeployStatus
}
managedCluster := &captenpluginspb.ManagedCluster{}
managedCluster.ClusterName = clusterCliam.Metadata.Name

if status.Status == readyStatusValue {
secretName := fmt.Sprintf(clusterSecretName, clusterCliam.Spec.Id)
resp, err := k8sclient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
if err != nil {
h.log.Errorf("failed to get secret %s/%s, %v", clusterCliam.Metadata.Namespace, secretName, err)
continue
}

clusterEndpoint := resp.Data[k8sEndpoint]
managedCluster.ClusterEndpoint = clusterEndpoint
cred := map[string]string{}
cred[kubeConfig] = resp.Data[kubeConfig]
cred[k8sClusterCA] = resp.Data[k8sClusterCA]
cred[k8sEndpoint] = clusterEndpoint

err = credential.PutGenericCredential(context.TODO(), managedClusterEntityName, managedCluster.Id, cred)
if err != nil {
h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err)
continue
}

managedCluster.ClusterDeployStatus = clusterReadyStatus
} else {
managedCluster.ClusterDeployStatus = clusterNotReadyStatus
}
clusterObj, ok := clusters[managedCluster.ClusterName]
if !ok {
managedCluster.Id = uuid.New().String()
} else {
h.log.Infof("found existing managed clusterId %s, updating", clusterObj.Id)
managedCluster.Id = clusterObj.Id
managedCluster.ClusterDeployStatus = clusterObj.ClusterDeployStatus
}

err = h.dbStore.UpsertManagedCluster(managedCluster)
if err != nil {
h.log.Info("failed to update information to db, %v", err)
continue
}
if strings.EqualFold(readyStatus, readyStatusValue) {
managedCluster.ClusterDeployStatus = clusterReadyStatus
} else {
managedCluster.ClusterDeployStatus = clusterNotReadyStatus
}

if managedCluster.ClusterDeployStatus == clusterReadyStatus {
// call config-worker.
err = h.triggerClusterUpdates(clusterCliam.Spec.Id, managedCluster.Id)
if err != nil {
h.log.Info("failed to update cluster endpoint information %v", err)
continue
}
secretName := fmt.Sprintf(clusterSecretName, clusterCliam.Spec.Id)
resp, err := k8sclient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
if err != nil {
h.log.Errorf("failed to get secret %s/%s, %v", clusterCliam.Metadata.Namespace, secretName, err)
continue
}

}
clusterEndpoint := resp.Data[k8sEndpoint]
managedCluster.ClusterEndpoint = clusterEndpoint
cred := map[string]string{}
cred[kubeConfig] = resp.Data[kubeConfig]
cred[k8sClusterCA] = resp.Data[k8sClusterCA]
cred[k8sEndpoint] = clusterEndpoint

err = credential.PutGenericCredential(context.TODO(), managedClusterEntityName, managedCluster.Id, cred)
if err != nil {
h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err)
continue
}

h.log.Infof("updated the cluster claim %s with status %s", managedCluster.ClusterName, managedCluster.ClusterDeployStatus)
err = h.dbStore.UpsertManagedCluster(managedCluster)
if err != nil {
h.log.Info("failed to update information to db, %v", err)
continue
}
h.log.Infof("updated the cluster claim %s with status %s", managedCluster.ClusterName, managedCluster.ClusterDeployStatus)

if managedCluster.ClusterDeployStatus == clusterReadyStatus {
// call config-worker.
err = h.triggerClusterUpdates(clusterCliam.Spec.Id, managedCluster.Id)
if err != nil {
h.log.Info("failed to trigger cluster update workflow, %v", err)
continue
}
h.log.Infof("triggered cluster update workflow for cluster %s", managedCluster.ClusterName)
}
}
return nil
Expand All @@ -246,14 +251,23 @@ func (h *ClusterClaimSyncHandler) triggerClusterUpdates(clusterName, managedClus
if err != nil {
return err
}
ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, Name: clusterName, ManagedK8SId: managedClusterID}

ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, Name: clusterName, ManagedClusterId: managedClusterID}
wd := workers.NewConfig(h.tc, h.log)

_, err = wd.SendEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneClusterUpdate}, ci)
if err != nil {
return err
}
return err
}

return nil
func getClusterClaimStatus(conditions []model.ClusterClaimCondition) (nodePoolStatus, controlPlaneStatus, readyStatus string) {
for _, condition := range conditions {
switch strings.ToLower(condition.Type) {
case readyStatusType:
readyStatus = condition.Status
case nodePoolStatusType:
nodePoolStatus = condition.Status
case controlPlaneStatusType:
controlPlaneStatus = condition.Status
}
}
return
}
39 changes: 25 additions & 14 deletions capten/config-worker/internal/crossplane/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,51 @@ var logger = logging.NewLogger()

func (c *CrossPlaneActivities) ConfigurationActivity(ctx context.Context, params model.ConfigureParameters, payload json.RawMessage) (model.ResponsePayload, error) {
logger.Infof("Activity: %s, %s", params.Resource, params.Action)
config, err := NewCrossPlaneApp()
status, err := processConfigurationActivity(ctx, params, payload)
if err != nil {
return model.ResponsePayload{
Status: string(model.WorkFlowStatusFailed),
Message: json.RawMessage("{\"error\": \"failed to initialize crossplane plugin\"}"),
Status: status,
Message: json.RawMessage(
fmt.Sprintf("{\"error\": \"%s\"}", err.Error())),
}, err
}

status := ""
logger.Infof("crossplane plugin action %s configured", params.Action)
return model.ResponsePayload{Status: status}, err
}

func processConfigurationActivity(ctx context.Context, params model.ConfigureParameters, payload json.RawMessage) (string, error) {
cp, err := NewCrossPlaneApp()
if err != nil {
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to initialize crossplane plugin")
}

switch params.Action {
case model.CrossPlaneClusterUpdate:
reqLocal := &model.CrossplaneClusterUpdate{}
if err = json.Unmarshal(payload, reqLocal); err != nil {
if err := json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req for %s, %v", model.CrossPlaneClusterUpdate, err)
err = fmt.Errorf("failed to unmarshall the crossplane req for %s", model.CrossPlaneClusterUpdate)
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane req for %s", model.CrossPlaneClusterUpdate)
}
status, err = config.configureClusterUpdate(ctx, reqLocal)
status, err := cp.configureClusterUpdate(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project for %s, %v", model.CrossPlaneClusterUpdate, err)
err = fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate)
return status, fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate)
}
default:
return status, nil
case model.CrossPlaneProjectSync:
reqLocal := &model.CrossplaneUseCase{}
if err = json.Unmarshal(payload, reqLocal); err != nil {
if err := json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req, %v", err)
err = fmt.Errorf("failed to unmarshall the crossplane req")
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane req")
}
status, err = config.configureProjectAndApps(ctx, reqLocal)
status, err := cp.configureProjectAndApps(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project, %v", err)
err = fmt.Errorf("failed to configure crossplane project")
}
return status, nil
default:
return string(model.WorkFlowStatusFailed), fmt.Errorf("invalid crossplane action")
}
logger.Infof("crossplane plugin configured")
return model.ResponsePayload{Status: status}, err
}
40 changes: 40 additions & 0 deletions capten/config-worker/internal/crossplane/argocd_app_values.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package crossplane

type Source struct {
RepoURL string `json:"repoURL,omitempty"`
TargetRevision string `json:"targetRevision,omitempty"`
}

type Dest struct {
Server string `json:"server,omitempty"`
Namespace string `json:"namespace,omitempty"`
}

type GlobalValues struct {
ClusterConfigPath string `json:"clusterConfigPath,omitempty"`
}

type DefaultApps struct {
Name string `json:"name,omitempty"`
ValuesPath string `json:"valuesPath,omitempty"`
RepoURL string `json:"repoURL,omitempty"`
Namespace string `json:"namespace,omitempty"`
Chart string `json:"chart,omitempty"`
TargetRevision string `json:"targetRevision,omitempty"`
}

type Cluster struct {
Name string `json:"name,omitempty"`
Server string `json:"server,omitempty"`
DefApps []DefaultApps `json:"defaultApps,omitempty"`
}

type ArgoCDAppValue struct {
Project string `json:"project,omitempty"`
Global GlobalValues `json:"global,omitempty"`
Src Source `json:"source,omitempty"`
Destination Dest `json:"destination,omitempty"`
SyncPolicy interface{} `json:"syncPolicy,omitempty"`
Compositions interface{} `json:"compositions,omitempty"`
Clusters *[]Cluster `json:"clusters,omitempty"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func getAppNameNamespace(ctx context.Context, fileName string) (string, string,

func (cp *CrossPlaneApp) configureClusterUpdate(ctx context.Context, req *model.CrossplaneClusterUpdate) (status string, err error) {
logger.Infof("configuring the cluster endpoint for %s", req.RepoURL)
endpoint, err := cp.helper.CreateCluster(ctx, req.Name, req.ManagedK8SId)
endpoint, err := cp.helper.CreateCluster(ctx, req.ManagedClusterId, req.Name)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to CreateCluster in argocd app")
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func updateClusterEndpointDetials(filename, clusterName, clusterEndpoint, defaul
return err
}

var argoCDAppValue model.ArgoCDAppValue
var argoCDAppValue ArgoCDAppValue

err = json.Unmarshal(jsonData, &argoCDAppValue)
if err != nil {
Expand All @@ -125,7 +125,7 @@ func updateClusterEndpointDetials(filename, clusterName, clusterEndpoint, defaul

for index := range defaultApps {
localObj := &defaultApps[index]
strings.ReplaceAll(localObj.AppConfigPath, clusterNameSub, clusterName)
strings.ReplaceAll(localObj.ValuesPath, clusterNameSub, clusterName)
}

logger.Infof("udpated the req endpoint details to %s for name %s ", clusterEndpoint, clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func readCrossPlanePluginConfig(pluginFile string) (*crossplanePluginConfig, err
return &pluginData, nil
}

func readClusterDefaultApps(clusterDefaultApp string) ([]model.DefaultApps, error) {
func readClusterDefaultApps(clusterDefaultApp string) ([]DefaultApps, error) {
data, err := os.ReadFile(filepath.Clean(clusterDefaultApp))
if err != nil {
return nil, fmt.Errorf("failed to read clusterDefaultApp File: %s, err: %w", clusterDefaultApp, err)
}

var defaultApps []model.DefaultApps
var defaultApps []DefaultApps
err = json.Unmarshal(data, &defaultApps)
if err != nil {
return nil, fmt.Errorf("%w", err)
Expand Down
42 changes: 4 additions & 38 deletions capten/model/config_workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,42 +31,8 @@ type CrossplaneUseCase struct {
}

type CrossplaneClusterUpdate struct {
Name string `json:"name,omitempty"`
GitProjectId string `json:"gitProjectId,omitempty"`
ManagedK8SId string `json:"managedK8SId,omitempty"`
RepoURL string `json:"repoURL,omitempty"`
}

type Source struct {
RepoURL string `json:"repoURL,omitempty"`
TargetRevision string `json:"targetRevision,omitempty"`
}

type Dest struct {
Server string `json:"server,omitempty"`
Namespace string `json:"namespace,omitempty"`
}

type DefaultApps struct {
Name string `json:"name,omitempty"`
AppConfigPath string `json:"appConfigPath,omitempty"`
RepoURL string `json:"repoURL,omitempty"`
Namespace string `json:"namespace,omitempty"`
Chart string `json:"chart,omitempty"`
TargetRevision string `json:"targetRevision,omitempty"`
}
type Cluster struct {
Name string `json:"name,omitempty"`
ConfigPath string `json:"configPath,omitempty"`
Server string `json:"server,omitempty"`
DefApps []DefaultApps `json:"defaultApps,omitempty"`
}

type ArgoCDAppValue struct {
Project string `json:"project,omitempty"`
Src Source `json:"source,omitempty"`
Destination Dest `json:"destination,omitempty"`
SyncPolicy interface{} `json:"syncPolicy,omitempty"`
Compositions interface{} `json:"compositions,omitempty"`
Clusters *[]Cluster `json:"clusters,omitempty"`
Name string `json:"name,omitempty"`
GitProjectId string `json:"gitProjectId,omitempty"`
ManagedClusterId string `json:"managedClusterId,omitempty"`
RepoURL string `json:"repoURL,omitempty"`
}
3 changes: 2 additions & 1 deletion capten/model/crossplane_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
const (
providerNamePrefix = "provider"
CrossPlaneResource = "crossplane"
CrossPlaneClusterUpdate = "crossplaneClusterUpdates"
CrossPlaneClusterUpdate = "crossplane-cluster-update"
CrossPlaneProjectSync = "crossplane-project-sync"
)

type CrossplaneProviderStatus string
Expand Down

0 comments on commit a0ceefa

Please sign in to comment.