Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update controller to use new planner logic #4825

Merged
merged 6 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
"github.com/pipe-cd/pipecd/pkg/app/piped/apistore/eventstore"
"github.com/pipe-cd/pipecd/pkg/app/piped/appconfigreporter"
"github.com/pipe-cd/pipecd/pkg/app/piped/chartrepo"
"github.com/pipe-cd/pipecd/pkg/app/piped/controller"
"github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector"
"github.com/pipe-cd/pipecd/pkg/app/piped/eventwatcher"
Expand All @@ -63,6 +62,7 @@
"github.com/pipe-cd/pipecd/pkg/app/piped/statsreporter"
"github.com/pipe-cd/pipecd/pkg/app/piped/toolregistry"
"github.com/pipe-cd/pipecd/pkg/app/piped/trigger"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cache/memorycache"
"github.com/pipe-cd/pipecd/pkg/cli"
Expand Down Expand Up @@ -392,6 +392,12 @@
})
}

cfgData, err := p.loadConfigByte(ctx)
if err != nil {
input.Logger.Error("failed to load piped configuration", zap.Error(err))
return err
}

Check warning on line 399 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L395-L399

Added lines #L395 - L399 were not covered by tests

// Start running deployment controller.
{
c := controller.NewController(
Expand All @@ -405,6 +411,7 @@
notifier,
decrypter,
cfg,
cfgData,

Check warning on line 414 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L414

Added line #L414 was not covered by tests
appManifestsCache,
p.gracePeriod,
input.Logger,
Expand Down Expand Up @@ -641,6 +648,44 @@
return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")
}

// loadConfig reads the Piped configuration data from the specified source.
func (p *piped) loadConfigByte(ctx context.Context) ([]byte, error) {
// HACK: When the version of cobra is updated to >=v1.8.0, this should be replaced with https://pkg.go.dev/github.com/spf13/cobra#Command.MarkFlagsMutuallyExclusive.
if err := p.hasTooManyConfigFlags(); err != nil {
return nil, err
}

Check warning on line 656 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L652-L656

Added lines #L652 - L656 were not covered by tests

if p.configFile != "" {
return os.ReadFile(p.configFile)
}

Check warning on line 660 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L658-L660

Added lines #L658 - L660 were not covered by tests

if p.configData != "" {
data, err := base64.StdEncoding.DecodeString(p.configData)
if err != nil {
return nil, fmt.Errorf("the given config-data isn't base64 encoded: %w", err)
}
return data, nil

Check warning on line 667 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L662-L667

Added lines #L662 - L667 were not covered by tests
}

if p.configGCPSecret != "" {
data, err := p.getConfigDataFromSecretManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load config from SecretManager (%w)", err)
}
return data, nil

Check warning on line 675 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L670-L675

Added lines #L670 - L675 were not covered by tests
}

if p.configAWSSecret != "" {
data, err := p.getConfigDataFromAWSSecretsManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load config from AWS Secrets Manager (%w)", err)
}
return data, nil

Check warning on line 683 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L678-L683

Added lines #L678 - L683 were not covered by tests
}

return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")

Check warning on line 686 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L686

Added line #L686 was not covered by tests
}

func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
sm := cfg.SecretManagement
if sm == nil {
Expand Down
23 changes: 16 additions & 7 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,17 @@ var (

type controller struct {
apiClient apiClient
pluginRegistry PluginRegistry
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
applicationLister applicationLister
liveResourceLister liveResourceLister
analysisResultStore analysisResultStore
notifier notifier
secretDecrypter secretDecrypter
pipedConfig *config.PipedSpec
pipedConfig []byte
secretDecrypter secretDecrypter // TODO: Remove this
pipedCfg *config.PipedSpec // TODO: Remove this, use pipedConfig instead
appManifestsCache cache.Cache
logPersister logpersister.Persister

Expand Down Expand Up @@ -155,7 +157,8 @@ func NewController(
analysisResultStore analysisResultStore,
notifier notifier,
sd secretDecrypter,
pipedConfig *config.PipedSpec,
pipedCfg *config.PipedSpec,
pipedConfig []byte,
appManifestsCache cache.Cache,
gracePeriod time.Duration,
logger *zap.Logger,
Expand All @@ -167,6 +170,7 @@ func NewController(
)
return &controller{
apiClient: apiClient,
pluginRegistry: DefaultPluginRegistry(),
gitClient: gitClient,
deploymentLister: deploymentLister,
commandLister: commandLister,
Expand All @@ -176,6 +180,7 @@ func NewController(
notifier: notifier,
secretDecrypter: sd,
appManifestsCache: appManifestsCache,
pipedCfg: pipedCfg,
pipedConfig: pipedConfig,
logPersister: lp,

Expand Down Expand Up @@ -468,17 +473,21 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
}
}

pluginClient, ok := c.pluginRegistry.Plugin(d.Kind)
if !ok {
logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String()))
return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String())
}

planner := newPlanner(
d,
commitHash,
configFilename,
workingDir,
pluginClient,
c.apiClient,
c.gitClient,
c.notifier,
c.secretDecrypter,
c.pipedConfig,
c.appManifestsCache,
c.logger,
)

Expand Down Expand Up @@ -621,7 +630,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
c.logPersister,
c.notifier,
c.secretDecrypter,
c.pipedConfig,
c.pipedCfg,
c.appManifestsCache,
c.logger,
)
Expand Down
110 changes: 34 additions & 76 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,41 @@ import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/piped/metadatastore"
pln "github.com/pipe-cd/pipecd/pkg/app/piped/planner"
"github.com/pipe-cd/pipecd/pkg/app/piped/planner/registry"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cache"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/regexpool"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform"
)

// What planner does:
// - Wait until there is no PLANNED or RUNNING deployment
// - Pick the oldest PENDING deployment to plan its pipeline
// - Compare with the last successful commit
// - Decide the pipeline should be executed (scale, progressive, rollback)
// - Update the pipeline stages and change the deployment status to PLANNED
type planner struct {
// Readonly deployment model.
deployment *model.Deployment
lastSuccessfulCommitHash string
lastSuccessfulConfigFilename string
workingDir string
apiClient apiClient
gitClient gitClient
metadataStore metadatastore.MetadataStore
notifier notifier
secretDecrypter secretDecrypter
plannerRegistry registry.Registry
pipedConfig *config.PipedSpec
appManifestsCache cache.Cache
logger *zap.Logger
pipedConfig []byte

// The pluginClient is used to call pluggin that actually
// performs planning deployment.
pluginClient platform.PlatformPluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient

// The notifier and metadataStore are used for
// notification features.
notifier notifier
metadataStore metadatastore.MetadataStore

// TODO: Find a way to show log from pluggin's planner
logger *zap.Logger

done atomic.Bool
doneTimestamp time.Time
Expand All @@ -72,12 +68,10 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClient platform.PlatformPluginClient,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
sd secretDecrypter,
pipedConfig *config.PipedSpec,
appManifestsCache cache.Cache,
pipedConfig []byte,
logger *zap.Logger,
) *planner {

Expand All @@ -94,14 +88,11 @@ func newPlanner(
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
pluginClient: pluginClient,
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: sd,
pipedConfig: pipedConfig,
plannerRegistry: registry.DefaultRegistry(),
appManifestsCache: appManifestsCache,
doneDeploymentStatus: d.Status,
cancelledCh: make(chan *model.ReportableCommand, 1),
nowFunc: time.Now,
Expand Down Expand Up @@ -142,6 +133,11 @@ func (p *planner) Cancel(cmd model.ReportableCommand) {
close(p.cancelledCh)
}

// What planner does:
// - Wait until there is no PLANNED or RUNNING deployment
// - Pick the oldest PENDING deployment to plan its pipeline
// - <*> Perform planning a deployment by calling the pluggin's planner
// - Update the deployment status to PLANNED or not based on the result
func (p *planner) Run(ctx context.Context) error {
p.logger.Info("start running planner")

Expand All @@ -150,56 +146,19 @@ func (p *planner) Run(ctx context.Context) error {
p.done.Store(true)
}()

repoCfg := config.PipedRepository{
RepoID: p.deployment.GitPath.Repo.Id,
Remote: p.deployment.GitPath.Repo.Remote,
Branch: p.deployment.GitPath.Repo.Branch,
}

in := pln.Input{
ApplicationID: p.deployment.ApplicationId,
ApplicationName: p.deployment.ApplicationName,
GitPath: *p.deployment.GitPath,
Trigger: *p.deployment.Trigger,
MostRecentSuccessfulCommitHash: p.lastSuccessfulCommitHash,
PipedConfig: p.pipedConfig,
AppManifestsCache: p.appManifestsCache,
RegexPool: regexpool.DefaultPool(),
GitClient: p.gitClient,
Logger: p.logger,
}

in.TargetDSP = deploysource.NewProvider(
filepath.Join(p.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
*p.deployment.GitPath,
p.secretDecrypter,
)

if p.lastSuccessfulCommitHash != "" {
gp := *p.deployment.GitPath
gp.ConfigFilename = p.lastSuccessfulConfigFilename

in.RunningDSP = deploysource.NewProvider(
filepath.Join(p.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
gp,
p.secretDecrypter,
)
}

defer func() {
controllermetrics.UpdateDeploymentStatus(p.deployment, p.doneDeploymentStatus)
}()

planner, ok := p.plannerRegistry.Planner(p.deployment.Kind)
if !ok {
p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
p.reportDeploymentFailed(ctx, "Unable to find the planner for this application kind")
return fmt.Errorf("unable to find the planner for application %v", p.deployment.Kind)
in := &platform.BuildPlanRequest{
Deployment: p.deployment,
WorkingDir: p.workingDir,
LastSuccessfulCommitHash: p.lastSuccessfulCommitHash,
LastSuccessfulConfigFileName: p.lastSuccessfulConfigFilename,
PipedConfig: p.pipedConfig,
}

out, err := planner.Plan(ctx, in)
out, err := p.pluginClient.BuildPlan(ctx, in)

// If the deployment was already cancelled, we ignore the plan result.
select {
Expand All @@ -219,10 +178,10 @@ func (p *planner) Run(ctx context.Context) error {
}

p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_PLANNED
return p.reportDeploymentPlanned(ctx, out)
return p.reportDeploymentPlanned(ctx, out.Plan)
}

func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) error {
func (p *planner) reportDeploymentPlanned(ctx context.Context, out *platform.DeploymentPlan) error {
var (
err error
retry = pipedservice.NewRetry(10)
Expand All @@ -232,7 +191,6 @@ func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) e
StatusReason: "The deployment has been planned",
RunningCommitHash: p.lastSuccessfulCommitHash,
RunningConfigFilename: p.lastSuccessfulConfigFilename,
Version: out.Version,
Versions: out.Versions,
Stages: out.Stages,
DeploymentChainId: p.deployment.DeploymentChainId,
Expand Down
Loading
Loading