Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update cluster endpoint #331

Merged
merged 8 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion capten/agent/internal/api/plugin_crossplane_project_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (a *Agent) configureCrossplaneGitRepo(req *model.CrossplaneProject, provide
VaultCredIdentifier: req.GitProjectId, CrossplaneProviders: providers}
wd := workers.NewConfig(a.tc, a.log)

wkfId, err := wd.SendAsyncEvent(context.TODO(), &captenmodel.ConfigureParameters{Resource: crossplaneConfigUseCase}, ci)
wkfId, err := wd.SendAsyncEvent(context.TODO(),
&captenmodel.ConfigureParameters{Resource: crossplaneConfigUseCase, Action: model.CrossPlaneProjectSync}, ci)
if err != nil {
req.Status = string(model.CrossplaneProjectConfigurationFailed)
req.WorkflowId = "NA"
Expand Down
146 changes: 100 additions & 46 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/google/uuid"
"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
"github.com/kube-tarian/kad/capten/agent/internal/temporalclient"
"github.com/kube-tarian/kad/capten/agent/internal/workers"

"github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb"

Expand All @@ -19,7 +22,10 @@ import (
)

var (
readyStatusType = "Ready"
readyStatusType = "ready"
nodePoolStatusType = "nodepool"
controlPlaneStatusType = "controlplane"

clusterNotReadyStatus = "NotReady"
clusterReadyStatus = "Ready"
readyStatusValue = "True"
Expand All @@ -37,15 +43,25 @@ var (

type ClusterClaimSyncHandler struct {
log logging.Logger
tc *temporalclient.Client
dbStore *captenstore.Store
}

func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) *ClusterClaimSyncHandler {
return &ClusterClaimSyncHandler{log: log, dbStore: dbStore}
func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ClusterClaimSyncHandler, error) {
tc, err := temporalclient.NewClient(log)
if err != nil {
return nil, err
}

return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil
}

func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewClusterClaimSyncHandler(log, dbStore), dynamicClient, cgvk)
obj, err := NewClusterClaimSyncHandler(log, dbStore)
if err != nil {
return err
}
return k8s.RegisterDynamicInformers(obj, dynamicClient, cgvk)
}

func getClusterClaimObj(obj any) (*model.ClusterClaim, error) {
Expand Down Expand Up @@ -151,55 +167,67 @@ func (h *ClusterClaimSyncHandler) updateManagedClusters(clusterCliams []model.Cl

for _, clusterCliam := range clusterCliams {
h.log.Infof("processing cluster claim %s", clusterCliam.Metadata.Name)
for _, status := range clusterCliam.Status.Conditions {
if status.Type != readyStatusType {
continue
}
nodePoolStatus, controlPlaneStatus, readyStatus := getClusterClaimStatus(clusterCliam.Status.Conditions)
h.log.Infof("cluster claim %s status: %s-%s-%s", clusterCliam.Metadata.Name, nodePoolStatus, controlPlaneStatus, readyStatus)

managedCluster := &captenpluginspb.ManagedCluster{}
managedCluster.ClusterName = clusterCliam.Metadata.Name
if !(strings.EqualFold(nodePoolStatus, "active") && strings.EqualFold(controlPlaneStatus, "active")) {
h.log.Infof("cluster %s is not created", clusterCliam.Metadata.Name)
return nil
}

clusterObj, ok := clusters[managedCluster.ClusterName]
if !ok {
managedCluster.Id = uuid.New().String()
} else {
h.log.Infof("found existing managed clusterId %s, updating", clusterObj.Id)
managedCluster.Id = clusterObj.Id
managedCluster.ClusterDeployStatus = clusterObj.ClusterDeployStatus
}
managedCluster := &captenpluginspb.ManagedCluster{}
managedCluster.ClusterName = clusterCliam.Metadata.Name

if status.Status == readyStatusValue {
secretName := fmt.Sprintf(clusterSecretName, clusterCliam.Spec.Id)
resp, err := k8sclient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
if err != nil {
h.log.Errorf("failed to get secret %s/%s, %v", clusterCliam.Metadata.Namespace, secretName, err)
continue
}

clusterEndpoint := resp.Data[k8sEndpoint]
managedCluster.ClusterEndpoint = clusterEndpoint
cred := map[string]string{}
cred[kubeConfig] = resp.Data[kubeConfig]
cred[k8sClusterCA] = resp.Data[k8sClusterCA]
cred[k8sEndpoint] = clusterEndpoint

err = credential.PutGenericCredential(context.TODO(), managedClusterEntityName, managedCluster.Id, cred)
if err != nil {
h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err)
continue
}

managedCluster.ClusterDeployStatus = clusterReadyStatus
} else {
managedCluster.ClusterDeployStatus = clusterNotReadyStatus
}
clusterObj, ok := clusters[managedCluster.ClusterName]
if !ok {
managedCluster.Id = uuid.New().String()
} else {
h.log.Infof("found existing managed clusterId %s, updating", clusterObj.Id)
managedCluster.Id = clusterObj.Id
managedCluster.ClusterDeployStatus = clusterObj.ClusterDeployStatus
}

if strings.EqualFold(readyStatus, readyStatusValue) {
managedCluster.ClusterDeployStatus = clusterReadyStatus
} else {
managedCluster.ClusterDeployStatus = clusterNotReadyStatus
}

err = h.dbStore.UpsertManagedCluster(managedCluster)
secretName := fmt.Sprintf(clusterSecretName, clusterCliam.Spec.Id)
resp, err := k8sclient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
if err != nil {
h.log.Errorf("failed to get secret %s/%s, %v", clusterCliam.Metadata.Namespace, secretName, err)
continue
}

clusterEndpoint := resp.Data[k8sEndpoint]
managedCluster.ClusterEndpoint = clusterEndpoint
cred := map[string]string{}
cred[kubeConfig] = resp.Data[kubeConfig]
cred[k8sClusterCA] = resp.Data[k8sClusterCA]
cred[k8sEndpoint] = clusterEndpoint

err = credential.PutGenericCredential(context.TODO(), managedClusterEntityName, managedCluster.Id, cred)
if err != nil {
h.log.Errorf("failed to store credential for %s, %v", managedCluster.Id, err)
continue
}

err = h.dbStore.UpsertManagedCluster(managedCluster)
if err != nil {
h.log.Info("failed to update information to db, %v", err)
continue
}
h.log.Infof("updated the cluster claim %s with status %s", managedCluster.ClusterName, managedCluster.ClusterDeployStatus)

if managedCluster.ClusterDeployStatus == clusterReadyStatus {
// call config-worker.
err = h.triggerClusterUpdates(clusterCliam.Spec.Id, managedCluster.Id)
if err != nil {
h.log.Info("failed to update information to db, %v", err)
h.log.Info("failed to trigger cluster update workflow, %v", err)
continue
}
h.log.Infof("updated the cluster claim %s with status %s", managedCluster.ClusterName, managedCluster.ClusterDeployStatus)
h.log.Infof("triggered cluster update workflow for cluster %s", managedCluster.ClusterName)
}
}
return nil
Expand All @@ -217,3 +245,29 @@ func (h *ClusterClaimSyncHandler) getManagedClusters() (map[string]*captenplugin
}
return clusterEndpointMap, nil
}

func (h *ClusterClaimSyncHandler) triggerClusterUpdates(clusterName, managedClusterID string) error {
proj, err := h.dbStore.GetCrossplaneProject()
if err != nil {
return err
}

ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, Name: clusterName, ManagedClusterId: managedClusterID}
wd := workers.NewConfig(h.tc, h.log)
_, err = wd.SendEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneClusterUpdate}, ci)
return err
}

func getClusterClaimStatus(conditions []model.ClusterClaimCondition) (nodePoolStatus, controlPlaneStatus, readyStatus string) {
for _, condition := range conditions {
switch strings.ToLower(condition.Type) {
case readyStatusType:
readyStatus = condition.Status
case nodePoolStatusType:
nodePoolStatus = condition.Status
case controlPlaneStatusType:
controlPlaneStatus = condition.Status
}
}
return
}
6 changes: 5 additions & 1 deletion capten/agent/internal/job/crossplane_resources_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ type CrossplaneResourcesSync struct {
}

func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *captenstore.Store) (*CrossplaneResourcesSync, error) {
ccObj, err := crossplane.NewClusterClaimSyncHandler(log, dbStore)
if err != nil {
return nil, err
}
return &CrossplaneResourcesSync{
log: log,
frequency: frequency,
dbStore: dbStore,
clusterHandler: crossplane.NewClusterClaimSyncHandler(log, dbStore),
clusterHandler: ccObj,
providerHandler: crossplane.NewProvidersSyncHandler(log, dbStore),
}, nil
}
Expand Down
9 changes: 9 additions & 0 deletions capten/common-pkg/k8s/dynamic_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ func ConvertYamlToJson(data []byte) ([]byte, error) {
return jsonData, nil
}

func ConvertJsonToYaml(data []byte) ([]byte, error) {
yamlData, err := yaml.JSONToYAML(data)
if err != nil {
return nil, err
}

return yamlData, nil
}

func (dc *DynamicClientSet) GetNameNamespace(jsonByte []byte) (string, string, error) {
var keyValue map[string]interface{}
if err := json.Unmarshal(jsonByte, &keyValue); err != nil {
Expand Down
42 changes: 37 additions & 5 deletions capten/config-worker/internal/app_config/app_git_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ const (
tmpGitProjectCloneStr = "clone*"
gitProjectAccessTokenAttribute = "accessToken"
gitUrlSuffix = ".git"
kubeConfig = "kubeconfig"
k8sEndpoint = "endpoint"
k8sClusterCA = "clusterCA"
)

type Config struct {
GitDefaultCommiterName string `envconfig:"GIT_COMMIT_NAME" default:"capten-bot"`
GitDefaultCommiterEmail string `envconfig:"GIT_COMMIT_EMAIL" default:"[email protected]"`
GitVaultEntityName string `envconfig:"GIT_VAULT_ENTITY_NAME" default:"git-project"`
GitCloneDir string `envconfig:"GIT_CLONE_DIR" default:"/gitCloneDir"`
GitBranchName string `envconfig:"GIT_BRANCH_NAME" default:"capten-template-bot"`
GitDefaultCommiterName string `envconfig:"GIT_COMMIT_NAME" default:"capten-bot"`
GitDefaultCommiterEmail string `envconfig:"GIT_COMMIT_EMAIL" default:"[email protected]"`
GitVaultEntityName string `envconfig:"GIT_VAULT_ENTITY_NAME" default:"git-project"`
GitCloneDir string `envconfig:"GIT_CLONE_DIR" default:"/gitCloneDir"`
GitBranchName string `envconfig:"GIT_BRANCH_NAME" default:"capten-template-bot"`
ManagedClusterEntityName string `envconfig:"MANAGED_CLUSER_VAULT_ENTITY_NAME" default:"managedcluster"`
}

var logger = logging.NewLogger()
Expand Down Expand Up @@ -125,6 +129,34 @@ func (ca *AppGitConfigHelper) SyncArgoCDApp(ctx context.Context, ns, resName str
return nil
}

func (ca *AppGitConfigHelper) CreateCluster(ctx context.Context, id, clusterName string) (string, error) {
credReader, err := credentials.NewCredentialReader(ctx)
if err != nil {
err = errors.WithMessage(err, "error in initializing credential reader")
return "", err
}

cred, err := credReader.GetCredential(ctx, credentials.GenericCredentialType,
ca.cfg.ManagedClusterEntityName, id)
if err != nil {
err = errors.WithMessagef(err, "error while reading credential %s/%s from the vault",
ca.cfg.GitVaultEntityName, id)
return "", err
}

client, err := argocd.NewClient(logger)
if err != nil {
return "", err
}

err = client.CreateOrUpdateCluster(ctx, clusterName, cred[kubeConfig])
if err != nil {
return "", err
}

return cred[k8sEndpoint], nil
}

func (ca *AppGitConfigHelper) WaitForArgoCDToSync(ctx context.Context, ns, resName string) error {
client, err := argocd.NewClient(logger)
if err != nil {
Expand Down
61 changes: 40 additions & 21 deletions capten/config-worker/internal/crossplane/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package crossplane
import (
"context"
"encoding/json"
"fmt"

"github.com/intelops/go-common/logging"
"github.com/kube-tarian/kad/capten/model"
Expand All @@ -14,33 +15,51 @@ var logger = logging.NewLogger()

func (c *CrossPlaneActivities) ConfigurationActivity(ctx context.Context, params model.ConfigureParameters, payload json.RawMessage) (model.ResponsePayload, error) {
logger.Infof("Activity: %s, %s", params.Resource, params.Action)

req := &model.CrossplaneUseCase{}
if err := json.Unmarshal(payload, req); err != nil {
status, err := processConfigurationActivity(ctx, params, payload)
if err != nil {
return model.ResponsePayload{
Status: string(model.WorkFlowStatusFailed),
Message: json.RawMessage("{\"error\": \"failed to read payload\"}"),
Status: status,
Message: json.RawMessage(
fmt.Sprintf("{\"error\": \"%s\"}", err.Error())),
}, err
}

config, err := NewCrossPlaneApp()
logger.Infof("crossplane plugin action %s configured", params.Action)
return model.ResponsePayload{Status: status}, err
}

func processConfigurationActivity(ctx context.Context, params model.ConfigureParameters, payload json.RawMessage) (string, error) {
cp, err := NewCrossPlaneApp()
if err != nil {
return model.ResponsePayload{
Status: string(model.WorkFlowStatusFailed),
Message: json.RawMessage("{\"error\": \"failed to initialize crossplane plugin\"}"),
}, err
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to initialize crossplane plugin")
}

status, err := config.Configure(ctx, req)
if err != nil {
logger.Errorf("crossplane plugin configure failed, %v", err)
return model.ResponsePayload{
Status: status,
Message: json.RawMessage("{\"error\": \"failed to configure crossplane plugin\"}"),
}, err
switch params.Action {
case model.CrossPlaneClusterUpdate:
reqLocal := &model.CrossplaneClusterUpdate{}
if err := json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req for %s, %v", model.CrossPlaneClusterUpdate, err)
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane req for %s", model.CrossPlaneClusterUpdate)
}
status, err := cp.configureClusterUpdate(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project for %s, %v", model.CrossPlaneClusterUpdate, err)
return status, fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate)
}
return status, nil
case model.CrossPlaneProjectSync:
reqLocal := &model.CrossplaneUseCase{}
if err := json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req, %v", err)
return string(model.WorkFlowStatusFailed), fmt.Errorf("failed to unmarshall the crossplane req")
}
status, err := cp.configureProjectAndApps(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project, %v", err)
err = fmt.Errorf("failed to configure crossplane project")
}
return status, nil
default:
return string(model.WorkFlowStatusFailed), fmt.Errorf("invalid crossplane action")
}
logger.Infof("crossplane plugin configured")
return model.ResponsePayload{
Status: status,
}, err
}
Loading
Loading