diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 19c90c5bdc..3a5c4a95c8 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -49,7 +49,6 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/piped/apistore/eventstore" "github.com/pipe-cd/pipecd/pkg/app/piped/appconfigreporter" "github.com/pipe-cd/pipecd/pkg/app/piped/chartrepo" - "github.com/pipe-cd/pipecd/pkg/app/piped/controller" "github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector" "github.com/pipe-cd/pipecd/pkg/app/piped/eventwatcher" @@ -63,6 +62,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/piped/statsreporter" "github.com/pipe-cd/pipecd/pkg/app/piped/toolregistry" "github.com/pipe-cd/pipecd/pkg/app/piped/trigger" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/cache/memorycache" "github.com/pipe-cd/pipecd/pkg/cli" @@ -392,6 +392,12 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { }) } + cfgData, err := p.loadConfigByte(ctx) + if err != nil { + input.Logger.Error("failed to load piped configuration", zap.Error(err)) + return err + } + // Start running deployment controller. { c := controller.NewController( @@ -405,6 +411,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { notifier, decrypter, cfg, + cfgData, appManifestsCache, p.gracePeriod, input.Logger, @@ -641,6 +648,44 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) { return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set") } +// loadConfig reads the Piped configuration data from the specified source. +func (p *piped) loadConfigByte(ctx context.Context) ([]byte, error) { + // HACK: When the version of cobra is updated to >=v1.8.0, this should be replaced with https://pkg.go.dev/github.com/spf13/cobra#Command.MarkFlagsMutuallyExclusive. + if err := p.hasTooManyConfigFlags(); err != nil { + return nil, err + } + + if p.configFile != "" { + return os.ReadFile(p.configFile) + } + + if p.configData != "" { + data, err := base64.StdEncoding.DecodeString(p.configData) + if err != nil { + return nil, fmt.Errorf("the given config-data isn't base64 encoded: %w", err) + } + return data, nil + } + + if p.configGCPSecret != "" { + data, err := p.getConfigDataFromSecretManager(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load config from SecretManager (%w)", err) + } + return data, nil + } + + if p.configAWSSecret != "" { + data, err := p.getConfigDataFromAWSSecretsManager(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load config from AWS Secrets Manager (%w)", err) + } + return data, nil + } + + return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set") +} + func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) { sm := cfg.SecretManagement if sm == nil { diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index 7924eae0da..6cf8fd5a57 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -106,6 +106,7 @@ var ( type controller struct { apiClient apiClient + pluginRegistry PluginRegistry gitClient gitClient deploymentLister deploymentLister commandLister commandLister @@ -113,8 +114,9 @@ type controller struct { liveResourceLister liveResourceLister analysisResultStore analysisResultStore notifier notifier - secretDecrypter secretDecrypter - pipedConfig *config.PipedSpec + pipedConfig []byte + secretDecrypter secretDecrypter // TODO: Remove this + pipedCfg *config.PipedSpec // TODO: Remove this, use pipedConfig instead appManifestsCache cache.Cache logPersister logpersister.Persister @@ -155,7 +157,8 @@ func NewController( analysisResultStore analysisResultStore, notifier notifier, sd secretDecrypter, - pipedConfig *config.PipedSpec, + pipedCfg *config.PipedSpec, + pipedConfig []byte, appManifestsCache cache.Cache, gracePeriod time.Duration, logger *zap.Logger, @@ -167,6 +170,7 @@ func NewController( ) return &controller{ apiClient: apiClient, + pluginRegistry: DefaultPluginRegistry(), gitClient: gitClient, deploymentLister: deploymentLister, commandLister: commandLister, @@ -176,6 +180,7 @@ func NewController( notifier: notifier, secretDecrypter: sd, appManifestsCache: appManifestsCache, + pipedCfg: pipedCfg, pipedConfig: pipedConfig, logPersister: lp, @@ -468,17 +473,21 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( } } + pluginClient, ok := c.pluginRegistry.Plugin(d.Kind) + if !ok { + logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String())) + return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String()) + } + planner := newPlanner( d, commitHash, configFilename, workingDir, + pluginClient, c.apiClient, - c.gitClient, c.notifier, - c.secretDecrypter, c.pipedConfig, - c.appManifestsCache, c.logger, ) @@ -621,7 +630,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) c.logPersister, c.notifier, c.secretDecrypter, - c.pipedConfig, + c.pipedCfg, c.appManifestsCache, c.logger, ) diff --git a/pkg/app/pipedv1/controller/planner.go b/pkg/app/pipedv1/controller/planner.go index 76ef9a3692..3e4cdcc50d 100644 --- a/pkg/app/pipedv1/controller/planner.go +++ b/pkg/app/pipedv1/controller/planner.go @@ -18,45 +18,41 @@ import ( "context" "encoding/json" "fmt" - "path/filepath" "time" "go.uber.org/atomic" "go.uber.org/zap" "github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics" - "github.com/pipe-cd/pipecd/pkg/app/piped/deploysource" "github.com/pipe-cd/pipecd/pkg/app/piped/metadatastore" - pln "github.com/pipe-cd/pipecd/pkg/app/piped/planner" - "github.com/pipe-cd/pipecd/pkg/app/piped/planner/registry" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" - "github.com/pipe-cd/pipecd/pkg/cache" "github.com/pipe-cd/pipecd/pkg/config" "github.com/pipe-cd/pipecd/pkg/model" - "github.com/pipe-cd/pipecd/pkg/regexpool" + "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform" ) -// What planner does: -// - Wait until there is no PLANNED or RUNNING deployment -// - Pick the oldest PENDING deployment to plan its pipeline -// - Compare with the last successful commit -// - Decide the pipeline should be executed (scale, progressive, rollback) -// - Update the pipeline stages and change the deployment status to PLANNED type planner struct { // Readonly deployment model. deployment *model.Deployment lastSuccessfulCommitHash string lastSuccessfulConfigFilename string workingDir string - apiClient apiClient - gitClient gitClient - metadataStore metadatastore.MetadataStore - notifier notifier - secretDecrypter secretDecrypter - plannerRegistry registry.Registry - pipedConfig *config.PipedSpec - appManifestsCache cache.Cache - logger *zap.Logger + pipedConfig []byte + + // The pluginClient is used to call pluggin that actually + // performs planning deployment. + pluginClient platform.PlatformPluginClient + + // The apiClient is used to report the deployment status. + apiClient apiClient + + // The notifier and metadataStore are used for + // notification features. + notifier notifier + metadataStore metadatastore.MetadataStore + + // TODO: Find a way to show log from pluggin's planner + logger *zap.Logger done atomic.Bool doneTimestamp time.Time @@ -72,12 +68,10 @@ func newPlanner( lastSuccessfulCommitHash string, lastSuccessfulConfigFilename string, workingDir string, + pluginClient platform.PlatformPluginClient, apiClient apiClient, - gitClient gitClient, notifier notifier, - sd secretDecrypter, - pipedConfig *config.PipedSpec, - appManifestsCache cache.Cache, + pipedConfig []byte, logger *zap.Logger, ) *planner { @@ -94,14 +88,11 @@ func newPlanner( lastSuccessfulCommitHash: lastSuccessfulCommitHash, lastSuccessfulConfigFilename: lastSuccessfulConfigFilename, workingDir: workingDir, + pluginClient: pluginClient, apiClient: apiClient, - gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), notifier: notifier, - secretDecrypter: sd, pipedConfig: pipedConfig, - plannerRegistry: registry.DefaultRegistry(), - appManifestsCache: appManifestsCache, doneDeploymentStatus: d.Status, cancelledCh: make(chan *model.ReportableCommand, 1), nowFunc: time.Now, @@ -142,6 +133,11 @@ func (p *planner) Cancel(cmd model.ReportableCommand) { close(p.cancelledCh) } +// What planner does: +// - Wait until there is no PLANNED or RUNNING deployment +// - Pick the oldest PENDING deployment to plan its pipeline +// - <*> Perform planning a deployment by calling the pluggin's planner +// - Update the deployment status to PLANNED or not based on the result func (p *planner) Run(ctx context.Context) error { p.logger.Info("start running planner") @@ -150,56 +146,19 @@ func (p *planner) Run(ctx context.Context) error { p.done.Store(true) }() - repoCfg := config.PipedRepository{ - RepoID: p.deployment.GitPath.Repo.Id, - Remote: p.deployment.GitPath.Repo.Remote, - Branch: p.deployment.GitPath.Repo.Branch, - } - - in := pln.Input{ - ApplicationID: p.deployment.ApplicationId, - ApplicationName: p.deployment.ApplicationName, - GitPath: *p.deployment.GitPath, - Trigger: *p.deployment.Trigger, - MostRecentSuccessfulCommitHash: p.lastSuccessfulCommitHash, - PipedConfig: p.pipedConfig, - AppManifestsCache: p.appManifestsCache, - RegexPool: regexpool.DefaultPool(), - GitClient: p.gitClient, - Logger: p.logger, - } - - in.TargetDSP = deploysource.NewProvider( - filepath.Join(p.workingDir, "target-deploysource"), - deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash), - *p.deployment.GitPath, - p.secretDecrypter, - ) - - if p.lastSuccessfulCommitHash != "" { - gp := *p.deployment.GitPath - gp.ConfigFilename = p.lastSuccessfulConfigFilename - - in.RunningDSP = deploysource.NewProvider( - filepath.Join(p.workingDir, "running-deploysource"), - deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash), - gp, - p.secretDecrypter, - ) - } - defer func() { controllermetrics.UpdateDeploymentStatus(p.deployment, p.doneDeploymentStatus) }() - planner, ok := p.plannerRegistry.Planner(p.deployment.Kind) - if !ok { - p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE - p.reportDeploymentFailed(ctx, "Unable to find the planner for this application kind") - return fmt.Errorf("unable to find the planner for application %v", p.deployment.Kind) + in := &platform.BuildPlanRequest{ + Deployment: p.deployment, + WorkingDir: p.workingDir, + LastSuccessfulCommitHash: p.lastSuccessfulCommitHash, + LastSuccessfulConfigFileName: p.lastSuccessfulConfigFilename, + PipedConfig: p.pipedConfig, } - out, err := planner.Plan(ctx, in) + out, err := p.pluginClient.BuildPlan(ctx, in) // If the deployment was already cancelled, we ignore the plan result. select { @@ -219,10 +178,10 @@ func (p *planner) Run(ctx context.Context) error { } p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_PLANNED - return p.reportDeploymentPlanned(ctx, out) + return p.reportDeploymentPlanned(ctx, out.Plan) } -func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) error { +func (p *planner) reportDeploymentPlanned(ctx context.Context, out *platform.DeploymentPlan) error { var ( err error retry = pipedservice.NewRetry(10) @@ -232,7 +191,6 @@ func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) e StatusReason: "The deployment has been planned", RunningCommitHash: p.lastSuccessfulCommitHash, RunningConfigFilename: p.lastSuccessfulConfigFilename, - Version: out.Version, Versions: out.Versions, Stages: out.Stages, DeploymentChainId: p.deployment.DeploymentChainId, diff --git a/pkg/app/pipedv1/controller/pluginregistry.go b/pkg/app/pipedv1/controller/pluginregistry.go new file mode 100644 index 0000000000..4a4d4eebee --- /dev/null +++ b/pkg/app/pipedv1/controller/pluginregistry.go @@ -0,0 +1,61 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package controller provides a piped component +// that handles all of the not completed deployments by managing a pool of planners and schedulers. +// Whenever a new PENDING deployment is detected, controller spawns a new planner for deciding +// the deployment pipeline and update the deployment status to PLANNED. +// Whenever a new PLANNED deployment is detected, controller spawns a new scheduler +// for scheduling and running its pipeline executors. +package controller + +import ( + "sync" + + "github.com/pipe-cd/pipecd/pkg/model" + "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform" +) + +type PluginRegistry interface { + Plugin(k model.ApplicationKind) (platform.PlatformPluginClient, bool) +} + +type pluginRegistry struct { + plugins map[model.ApplicationKind]platform.PlatformPluginClient + mu sync.RWMutex +} + +func (r *pluginRegistry) Plugin(k model.ApplicationKind) (platform.PlatformPluginClient, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + e, ok := r.plugins[k] + if !ok { + return nil, false + } + + return e, true +} + +var defaultPluginRegistry = &pluginRegistry{ + plugins: make(map[model.ApplicationKind]platform.PlatformPluginClient), +} + +func DefaultPluginRegistry() PluginRegistry { + return defaultPluginRegistry +} + +func init() { + // TODO: Register all available built-in plugins. +} diff --git a/pkg/plugin/api/v1alpha1/platform/client.go b/pkg/plugin/api/v1alpha1/platform/client.go new file mode 100644 index 0000000000..80e96bced9 --- /dev/null +++ b/pkg/plugin/api/v1alpha1/platform/client.go @@ -0,0 +1,57 @@ +// Copyright 2024 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package planner provides a piped component +// that decides the deployment pipeline of a deployment. +// The planner bases on the changes from git commits +// then builds the deployment manifests to know the behavior of the deployment. +// From that behavior the planner can decides which pipeline should be applied. +package platform + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" +) + +type PlatformPluginClient interface { + PlannerServiceClient + ExecutorServiceClient + Close() error +} + +type client struct { + PlannerServiceClient + ExecutorServiceClient + conn *grpc.ClientConn +} + +func NewClient(ctx context.Context, address string, opts ...rpcclient.DialOption) (PlatformPluginClient, error) { + conn, err := rpcclient.DialContext(ctx, address, opts...) + if err != nil { + return nil, err + } + + return &client{ + PlannerServiceClient: NewPlannerServiceClient(conn), + ExecutorServiceClient: NewExecutorServiceClient(conn), + conn: conn, + }, nil +} + +func (c *client) Close() error { + return c.conn.Close() +}