Skip to content

Commit

Permalink
[WAIT] Use metadataStore in the wait stage plugin (#5520)
Browse files Browse the repository at this point in the history
* Use metadataStore in wait

Signed-off-by: t-kikuc <[email protected]>

* pass metadataStoreClieint to constructor

Signed-off-by: t-kikuc <[email protected]>

---------

Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc authored Jan 31, 2025
1 parent bb47cbc commit e600dd9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
19 changes: 14 additions & 5 deletions pkg/app/pipedv1/plugin/wait/deployment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"
"github.com/pipe-cd/pipecd/pkg/plugin/signalhandler"
)

Expand All @@ -32,24 +33,32 @@ type deploymentServiceServer struct {

pluginConfig *config.PipedPlugin

logger *zap.Logger
logPersister logPersister
logger *zap.Logger
logPersister logPersister
metadataStore metadataStoreClient
}

type logPersister interface {
StageLogPersister(deploymentID, stageID string) logpersister.StageLogPersister
}

type metadataStoreClient interface {
GetStageMetadata(ctx context.Context, in *pipedservice.GetStageMetadataRequest, opts ...grpc.CallOption) (*pipedservice.GetStageMetadataResponse, error)
PutStageMetadata(ctx context.Context, in *pipedservice.PutStageMetadataRequest, opts ...grpc.CallOption) (*pipedservice.PutStageMetadataResponse, error)
}

// NewDeploymentService creates a new deploymentServiceServer of Wait Stage plugin.
func NewDeploymentService(
config *config.PipedPlugin,
logger *zap.Logger,
logPersister logPersister,
metadataStore metadataStoreClient,
) *deploymentServiceServer {
return &deploymentServiceServer{
pluginConfig: config,
logger: logger.Named("wait-stage-plugin"),
logPersister: logPersister,
pluginConfig: config,
logger: logger.Named("wait-stage-plugin"),
logPersister: logPersister,
metadataStore: metadataStore,
}
}

Expand Down
56 changes: 33 additions & 23 deletions pkg/app/pipedv1/plugin/wait/deployment/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ package deployment

import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/piped/logpersister"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/config"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"
)

type Stage string
Expand All @@ -43,12 +48,12 @@ func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.Ex
duration := opts.Duration.Duration()

// Retrieve the saved initialStart from the previous run.
initialStart := s.retrieveStartTime(in.Stage.Id)
initialStart := s.retrieveStartTime(ctx, in.Deployment.Id, in.Stage.Id)
if initialStart.IsZero() {
// When this is the first run.
initialStart = time.Now()
}
s.saveStartTime(ctx, initialStart, in.Stage.Id)
s.saveStartTime(ctx, initialStart, in.Deployment.Id, in.Stage.Id)

return wait(ctx, duration, initialStart, slp)
}
Expand Down Expand Up @@ -84,27 +89,32 @@ func wait(ctx context.Context, duration time.Duration, initialStart time.Time, s
}
}

func (s *deploymentServiceServer) retrieveStartTime(stageID string) (t time.Time) {
// TODO: implement this func with metadataStore
return time.Time{}
// sec, ok := s.metadataStore.Stage(stageId).Get(startTimeKey)
// if !ok {
// return
// }
// ut, err := strconv.ParseInt(sec, 10, 64)
// if err != nil {
// return
// }
// return time.Unix(ut, 0)
func (s *deploymentServiceServer) retrieveStartTime(ctx context.Context, deploymentID, stageID string) time.Time {
sec, err := s.metadataStore.GetStageMetadata(ctx, &pipedservice.GetStageMetadataRequest{
DeploymentId: deploymentID,
StageId: stageID,
Key: startTimeKey,
})
if err != nil {
s.logger.Error(fmt.Sprintf("failed to get stage metadata %s", startTimeKey), zap.Error(err))
return time.Time{}
}
ut, err := strconv.ParseInt(sec.Value, 10, 64)
if err != nil {
s.logger.Error(fmt.Sprintf("failed to parse stage metadata %s", startTimeKey), zap.Error(err))
return time.Time{}
}
return time.Unix(ut, 0)
}

func (s *deploymentServiceServer) saveStartTime(ctx context.Context, t time.Time, stageID string) {
// TODO: implement this func with metadataStore

// metadata := map[string]string{
// startTimeKey: strconv.FormatInt(t.Unix(), 10),
// }
// if err := s.metadataStore.Stage(stageId).PutMulti(ctx, metadata); err != nil {
// s.logger.Error("failed to store metadata", zap.Error(err))
// }
func (s *deploymentServiceServer) saveStartTime(ctx context.Context, t time.Time, deploymentID, stageID string) {
req := &pipedservice.PutStageMetadataRequest{
DeploymentId: deploymentID,
StageId: stageID,
Key: startTimeKey,
Value: strconv.FormatInt(t.Unix(), 10),
}
if _, err := s.metadataStore.PutStageMetadata(ctx, req); err != nil {
s.logger.Error(fmt.Sprintf("failed to store %s as stage metadata %s", req.Value, startTimeKey), zap.Error(err))
}
}
1 change: 1 addition & 0 deletions pkg/app/pipedv1/plugin/wait/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *plugin) run(ctx context.Context, input cli.Input) (runErr error) {
cfg,
input.Logger,
persister,
pipedapiClient,
)
opts = []rpc.Option{
rpc.WithPort(cfg.Port),
Expand Down

0 comments on commit e600dd9

Please sign in to comment.