From a86b86dfa1120a3ea3991294b8e3c2d229c539d8 Mon Sep 17 00:00:00 2001 From: venkat k Date: Fri, 31 May 2024 21:59:08 +0530 Subject: [PATCH] fix plugin deploy issue with workflow --- .../agent/internal/api/cluster_plugin_apis.go | 1 + capten/common-pkg/workers/deployment.go | 16 +++++++++- .../internal/activities/plugin_activity.go | 2 -- .../internal/workflows/plugin_workflow.go | 31 +++++++++++++++++-- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/capten/agent/internal/api/cluster_plugin_apis.go b/capten/agent/internal/api/cluster_plugin_apis.go index b9f96b87..f69b793d 100644 --- a/capten/agent/internal/api/cluster_plugin_apis.go +++ b/capten/agent/internal/api/cluster_plugin_apis.go @@ -16,6 +16,7 @@ func (a *Agent) GetClusterPlugins(ctx context.Context, request *clusterpluginspb }, nil } + a.log.Infof("pluginConfigList: %+v", pluginConfigList) clusterPlugins := []*clusterpluginspb.ClusterPlugin{} for idx, pluginConfig := range pluginConfigList { clusterPlugins[idx] = &clusterpluginspb.ClusterPlugin{ diff --git a/capten/common-pkg/workers/deployment.go b/capten/common-pkg/workers/deployment.go index 81902094..0e401ad6 100644 --- a/capten/common-pkg/workers/deployment.go +++ b/capten/common-pkg/workers/deployment.go @@ -19,6 +19,10 @@ const ( DeploymentWorkerTaskQueue = "Deployment" ) +type WorkflowHeader struct { + Action string `json:"action"` +} + type Deployment struct { client *temporalclient.Client log logging.Logger @@ -95,8 +99,18 @@ func (d *Deployment) SendEventV2( return nil, err } + header := WorkflowHeader{ + Action: action, + } + + headerJSON, err := json.Marshal(header) + if err != nil { + return nil, err + } + d.log.Infof("Sending event to temporal: workflow: %s, action: %s", workflowName, action) - run, err := d.client.ExecuteWorkflow(ctx, options, workflowName, action, json.RawMessage(deployPayloadJSON)) + d.log.Infof("deployPayload: %+v", string(headerJSON)) + run, err := d.client.ExecuteWorkflow(ctx, options, workflowName, json.RawMessage(headerJSON), json.RawMessage(deployPayloadJSON)) if err != nil { d.log.Errorf("failed to send event to workflow for plugin %s, %v", deployPayload.String(), err) return nil, err diff --git a/capten/deployment-worker/internal/activities/plugin_activity.go b/capten/deployment-worker/internal/activities/plugin_activity.go index b6408e8e..69f8d1de 100644 --- a/capten/deployment-worker/internal/activities/plugin_activity.go +++ b/capten/deployment-worker/internal/activities/plugin_activity.go @@ -63,13 +63,11 @@ func NewPluginActivities() (*PluginActivities, error) { as, err := captenstore.NewStore(logger) if err != nil { - logger.Errorf("failed to initialize plugin app store, %v", err) return nil, err } k8sclient, err := k8s.NewK8SClient(logger) if err != nil { - logger.Errorf("failed to get k8s client, %v", err) return nil, err } diff --git a/capten/deployment-worker/internal/workflows/plugin_workflow.go b/capten/deployment-worker/internal/workflows/plugin_workflow.go index 68233701..597bcc5f 100644 --- a/capten/deployment-worker/internal/workflows/plugin_workflow.go +++ b/capten/deployment-worker/internal/workflows/plugin_workflow.go @@ -20,9 +20,35 @@ import ( "gopkg.in/yaml.v2" ) -func PluginWorkflow(ctx workflow.Context, action string, payload json.RawMessage) (model.ResponsePayload, error) { - result := &model.ResponsePayload{} +type WorkflowHeader struct { + Action string `json:"action"` +} + +func PluginWorkflow(ctx workflow.Context, payload json.RawMessage) (model.ResponsePayload, error) { logger := logging.NewLogger() + logger.Infof("plugin deployment workflow started. header payload: %s", string(payload)) + + workflowData := []interface{}{} + err := json.Unmarshal(payload, &workflowData) + if err != nil { + return model.ResponsePayload{ + Status: "error", + Message: []byte(err.Error()), + }, err + } + logger.Infof("plugin deployment workflow started. header payload: %+v", workflowData) + + req := &WorkflowHeader{} + err = json.Unmarshal(workflowData[0].(json.RawMessage), req) + if err != nil { + return model.ResponsePayload{ + Status: "error", + Message: []byte(err.Error()), + }, err + } + + action := req.Action + result := &model.ResponsePayload{} as, err := captenstore.NewStore(logger) if err != nil { @@ -60,6 +86,7 @@ func setContext(ctx workflow.Context, timeInSeconds int, log logging.Logger) wor } func hanldeDeployWorkflow(ctx workflow.Context, payload json.RawMessage, log logging.Logger, pas *captenstore.Store) (*model.ResponsePayload, error) { + log.Info("Starting deploy plugin workflow") var a *activities.PluginActivities result := &model.ResponsePayload{} ctx = setContext(ctx, 600, log)