From 48e13a745b8d17eff8a5aa7f79df278649f5d886 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Tue, 4 Feb 2025 00:16:42 +0900 Subject: [PATCH 1/3] implement ListStageCommands() Signed-off-by: t-kikuc --- .../pipedv1/apistore/commandstore/store.go | 96 +++++++----- .../apistore/commandstore/store_test.go | 138 ++++++++++++++++++ .../pipedv1/cmd/piped/grpcapi/plugin_api.go | 26 +++- pkg/app/pipedv1/cmd/piped/piped.go | 2 +- pkg/app/pipedv1/controller/controller.go | 1 - 5 files changed, 222 insertions(+), 41 deletions(-) create mode 100644 pkg/app/pipedv1/apistore/commandstore/store_test.go diff --git a/pkg/app/pipedv1/apistore/commandstore/store.go b/pkg/app/pipedv1/apistore/commandstore/store.go index 536def060a..69115c8273 100644 --- a/pkg/app/pipedv1/apistore/commandstore/store.go +++ b/pkg/app/pipedv1/apistore/commandstore/store.go @@ -16,6 +16,7 @@ package commandstore import ( "context" + "fmt" "sync" "time" @@ -34,6 +35,7 @@ type apiClient interface { type Store interface { Run(ctx context.Context) error Lister() Lister + StageCommandHandledReporter() StageCommandHandledReporter } // Lister helps list commands. @@ -41,25 +43,32 @@ type Store interface { type Lister interface { ListApplicationCommands() []model.ReportableCommand ListDeploymentCommands() []model.ReportableCommand - ListStageCommands(deploymentID, stageID string) []model.ReportableCommand ListBuildPlanPreviewCommands() []model.ReportableCommand ListPipedCommands() []model.ReportableCommand + + // ListStageCommands returns all stage commands of the given deployment and stage. + // If the command type is not supported, it returns an error. + ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error) } +// stageCommandMap is a map of stage commands. Keys are deploymentID and stageID. +type stageCommandMap map[string]map[string][]*model.Command + type store struct { apiClient apiClient syncInterval time.Duration // TODO: Using atomic for storing a map of all commands // instead of some separate lists + mutex as the current. - applicationCommands []model.ReportableCommand - deploymentCommands []model.ReportableCommand - stageCommands []model.ReportableCommand - planPreviewCommands []model.ReportableCommand - pipedCommands []model.ReportableCommand - handledCommands map[string]time.Time - mu sync.RWMutex - gracePeriod time.Duration - logger *zap.Logger + applicationCommands []model.ReportableCommand + deploymentCommands []model.ReportableCommand + planPreviewCommands []model.ReportableCommand + pipedCommands []model.ReportableCommand + stageApproveCommands stageCommandMap + stageSkipCommands stageCommandMap + handledCommands map[string]time.Time + mu sync.RWMutex + gracePeriod time.Duration + logger *zap.Logger } var ( @@ -117,11 +126,12 @@ func (s *store) sync(ctx context.Context) error { } var ( - applicationCommands = make([]model.ReportableCommand, 0) - deploymentCommands = make([]model.ReportableCommand, 0) - stageCommands = make([]model.ReportableCommand, 0) - planPreviewCommands = make([]model.ReportableCommand, 0) - pipedCommands = make([]model.ReportableCommand, 0) + applicationCommands = make([]model.ReportableCommand, 0) + deploymentCommands = make([]model.ReportableCommand, 0) + planPreviewCommands = make([]model.ReportableCommand, 0) + pipedCommands = make([]model.ReportableCommand, 0) + stageApproveCommands stageCommandMap + stageSkipCommands stageCommandMap ) for _, cmd := range resp.Commands { switch cmd.Type { @@ -129,21 +139,24 @@ func (s *store) sync(ctx context.Context) error { applicationCommands = append(applicationCommands, s.makeReportableCommand(cmd)) case model.Command_CANCEL_DEPLOYMENT: deploymentCommands = append(deploymentCommands, s.makeReportableCommand(cmd)) - case model.Command_APPROVE_STAGE, model.Command_SKIP_STAGE: - stageCommands = append(stageCommands, s.makeReportableCommand(cmd)) case model.Command_BUILD_PLAN_PREVIEW: planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd)) case model.Command_RESTART_PIPED: pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd)) + case model.Command_APPROVE_STAGE: + stageApproveCommands.append(cmd) + case model.Command_SKIP_STAGE: + stageSkipCommands.append(cmd) } } s.mu.Lock() s.applicationCommands = applicationCommands s.deploymentCommands = deploymentCommands - s.stageCommands = stageCommands s.planPreviewCommands = planPreviewCommands s.pipedCommands = pipedCommands + s.stageApproveCommands = stageApproveCommands + s.stageSkipCommands = stageSkipCommands s.mu.Unlock() return nil @@ -191,26 +204,6 @@ func (s *store) ListDeploymentCommands() []model.ReportableCommand { return commands } -func (s *store) ListStageCommands(deploymentID, stageID string) []model.ReportableCommand { - s.mu.RLock() - defer s.mu.RUnlock() - - commands := make([]model.ReportableCommand, 0, len(s.stageCommands)) - for _, cmd := range s.stageCommands { - if _, ok := s.handledCommands[cmd.Id]; ok { - continue - } - if cmd.DeploymentId != deploymentID { - continue - } - if cmd.StageId != stageID { - continue - } - commands = append(commands, cmd) - } - return commands -} - func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand { s.mu.RLock() defer s.mu.RUnlock() @@ -264,3 +257,30 @@ func (s *store) reportCommandHandled(ctx context.Context, c *model.Command, stat }) return err } + +func (s *store) ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error) { + var list stageCommandMap + switch commandType { + case model.Command_APPROVE_STAGE: + list = s.stageApproveCommands + case model.Command_SKIP_STAGE: + list = s.stageSkipCommands + default: + s.logger.Error("invalid command type", zap.String("commandType", commandType.String())) + return nil, fmt.Errorf("invalid command type: %v", commandType.String()) + } + + s.mu.RLock() + defer s.mu.RUnlock() + + return list[deploymentID][stageID], nil +} + +func (m stageCommandMap) append(c *model.Command) { + deploymentID := c.DeploymentId + stageID := c.StageId + if _, ok := m[deploymentID]; !ok { + m[deploymentID] = make(map[string][]*model.Command) + } + m[deploymentID][stageID] = append(m[deploymentID][stageID], c) +} diff --git a/pkg/app/pipedv1/apistore/commandstore/store_test.go b/pkg/app/pipedv1/apistore/commandstore/store_test.go new file mode 100644 index 0000000000..674961635d --- /dev/null +++ b/pkg/app/pipedv1/apistore/commandstore/store_test.go @@ -0,0 +1,138 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package commandstore + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/pipe-cd/pipecd/pkg/model" +) + +func TestListStageCommands(t *testing.T) { + t.Parallel() + + store := store{ + stageApproveCommands: stageCommandMap{ + "deployment-1": { + "stage-1": []*model.Command{ + { + Id: "command-1", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + Commander: "commander-1", + }, + { + Id: "command-2", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + Commander: "commander-2", + }, + }, + }, + }, + stageSkipCommands: stageCommandMap{ + "deployment-11": { + "stage-11": []*model.Command{ + { + Id: "command-11", + DeploymentId: "deployment-11", + StageId: "stage-11", + Type: model.Command_SKIP_STAGE, + }, + }, + }, + }, + logger: zap.NewNop(), + } + + testcases := []struct { + name string + deploymentID string + stageID string + commandType model.Command_Type + want []*model.Command + wantErr error + }{ + { + name: "valid arguments of Approve", + deploymentID: "deployment-1", + stageID: "stage-1", + commandType: model.Command_APPROVE_STAGE, + want: []*model.Command{ + { + Id: "command-1", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + Commander: "commander-1", + }, + { + Id: "command-2", + DeploymentId: "deployment-1", + StageId: "stage-1", + Type: model.Command_APPROVE_STAGE, + Commander: "commander-2", + }, + }, + wantErr: nil, + }, + { + name: "valid arguments of Skip", + deploymentID: "deployment-11", + stageID: "stage-11", + commandType: model.Command_SKIP_STAGE, + want: []*model.Command{ + { + Id: "command-11", + DeploymentId: "deployment-11", + StageId: "stage-11", + Type: model.Command_SKIP_STAGE, + }, + }, + wantErr: nil, + }, + { + name: "stageID not exists", + deploymentID: "deployment-1", + stageID: "stage-999", + commandType: model.Command_APPROVE_STAGE, + want: nil, + wantErr: nil, + }, + { + name: "invalid commandType", + deploymentID: "deployment-1", + stageID: "stage-1", + commandType: model.Command_CANCEL_DEPLOYMENT, + want: nil, + wantErr: fmt.Errorf("invalid command type: CANCEL_DEPLOYMENT"), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := store.ListStageCommands(tc.deploymentID, tc.stageID, tc.commandType) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.want, got) + }) + } +} diff --git a/pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go b/pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go index ecd4cbd9ae..842616bbd5 100644 --- a/pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go +++ b/pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go @@ -21,6 +21,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" + "github.com/pipe-cd/pipecd/pkg/model" service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice" "go.uber.org/zap" @@ -36,6 +37,7 @@ type PluginAPI struct { toolRegistry *toolRegistry Logger *zap.Logger metadataStoreRegistry *metadatastore.MetadataStoreRegistry + stageCommandLister stageCommandLister } type apiClient interface { @@ -43,12 +45,23 @@ type apiClient interface { ReportStageLogsFromLastCheckpoint(ctx context.Context, in *pipedservice.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsFromLastCheckpointResponse, error) } +type stageCommandLister interface { + ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error) +} + // Register registers all handling of this service into the specified gRPC server. func (a *PluginAPI) Register(server *grpc.Server) { service.RegisterPluginServiceServer(server, a) } -func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, toolsDir string, logger *zap.Logger, metadataStoreRegistry *metadatastore.MetadataStoreRegistry) (*PluginAPI, error) { +func NewPluginAPI( + cfg *config.PipedSpec, + apiClient apiClient, + toolsDir string, + logger *zap.Logger, + metadataStoreRegistry *metadatastore.MetadataStoreRegistry, + stageCommandLister stageCommandLister, +) (*PluginAPI, error) { toolRegistry, err := newToolRegistry(toolsDir) if err != nil { return nil, fmt.Errorf("failed to create tool registry: %w", err) @@ -60,6 +73,7 @@ func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, toolsDir string, l toolRegistry: toolRegistry, Logger: logger.Named("plugin-api"), metadataStoreRegistry: metadataStoreRegistry, + stageCommandLister: stageCommandLister, }, nil } @@ -143,3 +157,13 @@ func (a *PluginAPI) PutDeploymentPluginMetadataMulti(ctx context.Context, req *s func (a *PluginAPI) GetDeploymentSharedMetadata(ctx context.Context, req *service.GetDeploymentSharedMetadataRequest) (*service.GetDeploymentSharedMetadataResponse, error) { return a.metadataStoreRegistry.GetDeploymentSharedMetadata(ctx, req) } + +func (a *PluginAPI) ListStageCommands(ctx context.Context, req *service.ListStageCommandsRequest) (*service.ListStageCommandsResponse, error) { + commands, err := a.stageCommandLister.ListStageCommands(req.DeploymentId, req.StageId, req.Type) + if err != nil { + return nil, err + } + return &service.ListStageCommandsResponse{ + Commands: commands, + }, nil +} diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 1989db9b1a..4c8b0e7df5 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -302,7 +302,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { // Start running plugin service server. { var ( - service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger, metadataStoreRegistry) + service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger, metadataStoreRegistry, commandLister) opts = []rpc.Option{ rpc.WithPort(p.pluginServicePort), rpc.WithGracePeriod(p.gracePeriod), diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index 11d554856a..e005f41aab 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -68,7 +68,6 @@ type deploymentLister interface { type commandLister interface { ListDeploymentCommands() []model.ReportableCommand - ListStageCommands(deploymentID, stageID string) []model.ReportableCommand } type notifier interface { From 92b878d4b601d27984de5669fb5b8933766f1fb5 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Tue, 4 Feb 2025 00:27:23 +0900 Subject: [PATCH 2/3] fix: remove remainded unintended Signed-off-by: t-kikuc --- pkg/app/pipedv1/apistore/commandstore/store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/app/pipedv1/apistore/commandstore/store.go b/pkg/app/pipedv1/apistore/commandstore/store.go index 69115c8273..8809ab81b0 100644 --- a/pkg/app/pipedv1/apistore/commandstore/store.go +++ b/pkg/app/pipedv1/apistore/commandstore/store.go @@ -35,7 +35,6 @@ type apiClient interface { type Store interface { Run(ctx context.Context) error Lister() Lister - StageCommandHandledReporter() StageCommandHandledReporter } // Lister helps list commands. From 4107029e61453fd6ec190971110f45422383a550 Mon Sep 17 00:00:00 2001 From: t-kikuc Date: Tue, 4 Feb 2025 00:35:11 +0900 Subject: [PATCH 3/3] refactor: rename a map for clarity Signed-off-by: t-kikuc --- pkg/app/pipedv1/apistore/commandstore/store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/app/pipedv1/apistore/commandstore/store.go b/pkg/app/pipedv1/apistore/commandstore/store.go index 8809ab81b0..08987ff46f 100644 --- a/pkg/app/pipedv1/apistore/commandstore/store.go +++ b/pkg/app/pipedv1/apistore/commandstore/store.go @@ -258,12 +258,12 @@ func (s *store) reportCommandHandled(ctx context.Context, c *model.Command, stat } func (s *store) ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error) { - var list stageCommandMap + var commands stageCommandMap switch commandType { case model.Command_APPROVE_STAGE: - list = s.stageApproveCommands + commands = s.stageApproveCommands case model.Command_SKIP_STAGE: - list = s.stageSkipCommands + commands = s.stageSkipCommands default: s.logger.Error("invalid command type", zap.String("commandType", commandType.String())) return nil, fmt.Errorf("invalid command type: %v", commandType.String()) @@ -272,7 +272,7 @@ func (s *store) ListStageCommands(deploymentID, stageID string, commandType mode s.mu.RLock() defer s.mu.RUnlock() - return list[deploymentID][stageID], nil + return commands[deploymentID][stageID], nil } func (m stageCommandMap) append(c *model.Command) {