Skip to content

Commit

Permalink
fix plugin deploy issue with workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
vramk23 committed May 31, 2024
1 parent 0f8e275 commit a86b86d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
1 change: 1 addition & 0 deletions capten/agent/internal/api/cluster_plugin_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (a *Agent) GetClusterPlugins(ctx context.Context, request *clusterpluginspb
}, nil
}

a.log.Infof("pluginConfigList: %+v", pluginConfigList)
clusterPlugins := []*clusterpluginspb.ClusterPlugin{}
for idx, pluginConfig := range pluginConfigList {
clusterPlugins[idx] = &clusterpluginspb.ClusterPlugin{
Expand Down
16 changes: 15 additions & 1 deletion capten/common-pkg/workers/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
DeploymentWorkerTaskQueue = "Deployment"
)

type WorkflowHeader struct {
Action string `json:"action"`
}

type Deployment struct {
client *temporalclient.Client
log logging.Logger
Expand Down Expand Up @@ -95,8 +99,18 @@ func (d *Deployment) SendEventV2(
return nil, err
}

header := WorkflowHeader{
Action: action,
}

headerJSON, err := json.Marshal(header)
if err != nil {
return nil, err
}

d.log.Infof("Sending event to temporal: workflow: %s, action: %s", workflowName, action)
run, err := d.client.ExecuteWorkflow(ctx, options, workflowName, action, json.RawMessage(deployPayloadJSON))
d.log.Infof("deployPayload: %+v", string(headerJSON))
run, err := d.client.ExecuteWorkflow(ctx, options, workflowName, json.RawMessage(headerJSON), json.RawMessage(deployPayloadJSON))
if err != nil {
d.log.Errorf("failed to send event to workflow for plugin %s, %v", deployPayload.String(), err)
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ func NewPluginActivities() (*PluginActivities, error) {

as, err := captenstore.NewStore(logger)
if err != nil {
logger.Errorf("failed to initialize plugin app store, %v", err)
return nil, err
}

k8sclient, err := k8s.NewK8SClient(logger)
if err != nil {
logger.Errorf("failed to get k8s client, %v", err)
return nil, err
}

Expand Down
31 changes: 29 additions & 2 deletions capten/deployment-worker/internal/workflows/plugin_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,35 @@ import (
"gopkg.in/yaml.v2"
)

func PluginWorkflow(ctx workflow.Context, action string, payload json.RawMessage) (model.ResponsePayload, error) {
result := &model.ResponsePayload{}
type WorkflowHeader struct {
Action string `json:"action"`
}

func PluginWorkflow(ctx workflow.Context, payload json.RawMessage) (model.ResponsePayload, error) {
logger := logging.NewLogger()
logger.Infof("plugin deployment workflow started. header payload: %s", string(payload))

workflowData := []interface{}{}
err := json.Unmarshal(payload, &workflowData)
if err != nil {
return model.ResponsePayload{
Status: "error",
Message: []byte(err.Error()),
}, err
}
logger.Infof("plugin deployment workflow started. header payload: %+v", workflowData)

req := &WorkflowHeader{}
err = json.Unmarshal(workflowData[0].(json.RawMessage), req)
if err != nil {
return model.ResponsePayload{
Status: "error",
Message: []byte(err.Error()),
}, err
}

action := req.Action
result := &model.ResponsePayload{}

as, err := captenstore.NewStore(logger)
if err != nil {
Expand Down Expand Up @@ -60,6 +86,7 @@ func setContext(ctx workflow.Context, timeInSeconds int, log logging.Logger) wor
}

func hanldeDeployWorkflow(ctx workflow.Context, payload json.RawMessage, log logging.Logger, pas *captenstore.Store) (*model.ResponsePayload, error) {
log.Info("Starting deploy plugin workflow")
var a *activities.PluginActivities
result := &model.ResponsePayload{}
ctx = setContext(ctx, 600, log)
Expand Down

0 comments on commit a86b86d

Please sign in to comment.