Skip to content

Commit

Permalink
[Wait] Implemented the core of executing Wait with WaitStageOptions (#…
Browse files Browse the repository at this point in the history
…5456)

* add WaitStageOptions

Signed-off-by: t-kikuc <[email protected]>

* create stageoptions interface

Signed-off-by: t-kikuc <[email protected]>

* Implement wait execution with WIP metadataStore

Signed-off-by: t-kikuc <[email protected]>

* Delete stageoptions

Signed-off-by: t-kikuc <[email protected]>

* Determine the position of WaitStageOptions

Signed-off-by: t-kikuc <[email protected]>

* refine waitStageOptions with tests

Signed-off-by: t-kikuc <[email protected]>

* Remove WaitStageOptions from configv1/application.go

Signed-off-by: t-kikuc <[email protected]>

* refactor wait execution

Signed-off-by: t-kikuc <[email protected]>

* add test of wait()

Signed-off-by: t-kikuc <[email protected]>

* remove unused const

Signed-off-by: t-kikuc <[email protected]>

* fix import order

Signed-off-by: t-kikuc <[email protected]>

* fixed stylechecks (Id->ID)

Signed-off-by: t-kikuc <[email protected]>

---------

Signed-off-by: t-kikuc <[email protected]>
  • Loading branch information
t-kikuc authored Jan 20, 2025
1 parent 5ecdec3 commit aa454ff
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 10 deletions.
48 changes: 48 additions & 0 deletions pkg/app/pipedv1/plugin/wait/config/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"encoding/json"
"fmt"

config "github.com/pipe-cd/pipecd/pkg/configv1"
)

// WaitStageOptions contains configurable values for a WAIT stage.
type WaitStageOptions struct {
Duration config.Duration `json:"duration,omitempty"`
// TODO: Handle SkipOn options.
// SkipOn config.SkipOptions `json:"skipOn,omitempty"`
}

func (o WaitStageOptions) validate() error {
if o.Duration <= 0 {
return fmt.Errorf("duration must be greater than 0")
}
return nil
}

// Decode decodes the raw JSON data and validates it.
func Decode(data json.RawMessage) (WaitStageOptions, error) {
var opts WaitStageOptions
if err := json.Unmarshal(data, &opts); err != nil {
return WaitStageOptions{}, fmt.Errorf("failed to unmarshal the config: %w", err)
}
if err := opts.validate(); err != nil {
return WaitStageOptions{}, fmt.Errorf("failed to validate the config: %w", err)
}
return opts, nil
}
81 changes: 81 additions & 0 deletions pkg/app/pipedv1/plugin/wait/config/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"

config "github.com/pipe-cd/pipecd/pkg/configv1"
)

func TestDecode(t *testing.T) {
t.Parallel()
testcases := []struct {
name string
data json.RawMessage
expected WaitStageOptions
wantErr bool
}{
{
name: "valid config",
data: json.RawMessage(`{"duration":"1m"}`),
expected: WaitStageOptions{
Duration: config.Duration(1 * time.Minute),
},
wantErr: false,
},
{
name: "invalid config",
data: json.RawMessage(`invalid`),
expected: WaitStageOptions{},
wantErr: true,
},
{
name: "empty config",
data: json.RawMessage(`{}`),
expected: WaitStageOptions{},
wantErr: true,
},
{
name: "negative duration",
data: json.RawMessage(`{
"duration":"-1m"
}`),
expected: WaitStageOptions{},
wantErr: true,
},
{
name: "zero duration",
data: json.RawMessage(`{
"duration":"0s"
}`),
expected: WaitStageOptions{},
wantErr: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, err := Decode(tc.data)
assert.Equal(t, tc.wantErr, err != nil)
assert.Equal(t, tc.expected, got)
})
}
}
81 changes: 78 additions & 3 deletions pkg/app/pipedv1/plugin/wait/deployment/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,95 @@ package deployment

import (
"context"
"time"

"github.com/pipe-cd/pipecd/pkg/app/piped/logpersister"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/wait/config"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type Stage string

const (
stageWait Stage = "WAIT"
logInterval = 10 * time.Second
startTimeKey = "startTime"
stageWait Stage = "WAIT"
)

// Execute starts waiting for the specified duration.
func (s *deploymentServiceServer) execute(ctx context.Context, in *deployment.ExecutePluginInput, slp logpersister.StageLogPersister) model.StageStatus {
// TOD: implement the logic of waiting
return model.StageStatus_STAGE_FAILURE
opts, err := config.Decode(in.StageConfig)
if err != nil {
slp.Errorf("failed to decode the stage config: %v", err)
return model.StageStatus_STAGE_FAILURE
}

duration := opts.Duration.Duration()

// Retrieve the saved initialStart from the previous run.
initialStart := s.retrieveStartTime(in.Stage.Id)
if initialStart.IsZero() {
// When this is the first run.
initialStart = time.Now()
}
s.saveStartTime(ctx, initialStart, in.Stage.Id)

return wait(ctx, duration, initialStart, slp)
}

func wait(ctx context.Context, duration time.Duration, initialStart time.Time, slp logpersister.StageLogPersister) model.StageStatus {
remaining := duration - time.Since(initialStart)
if remaining <= 0 {
// When this stage restarted and the duration has already passed.
slp.Infof("Already waited for %v since %v", duration, initialStart.Local())
return model.StageStatus_STAGE_SUCCESS
}

timer := time.NewTimer(remaining)
defer timer.Stop()

ticker := time.NewTicker(logInterval)
defer ticker.Stop()

slp.Infof("Waiting for %v since %v...", duration, initialStart.Local())
for {
select {
case <-timer.C: // on completed
slp.Infof("Waited for %v", duration)
return model.StageStatus_STAGE_SUCCESS

case <-ticker.C: // on interval elapsed
slp.Infof("%v elapsed...", time.Since(initialStart))

case <-ctx.Done(): // on cancelled
slp.Info("Wait cancelled")
return model.StageStatus_STAGE_CANCELLED
}
}
}

func (s *deploymentServiceServer) retrieveStartTime(stageID string) (t time.Time) {
// TODO: implement this func with metadataStore
return time.Time{}
// sec, ok := s.metadataStore.Stage(stageId).Get(startTimeKey)
// if !ok {
// return
// }
// ut, err := strconv.ParseInt(sec, 10, 64)
// if err != nil {
// return
// }
// return time.Unix(ut, 0)
}

func (s *deploymentServiceServer) saveStartTime(ctx context.Context, t time.Time, stageID string) {
// TODO: implement this func with metadataStore

// metadata := map[string]string{
// startTimeKey: strconv.FormatInt(t.Unix(), 10),
// }
// if err := s.metadataStore.Stage(stageId).PutMulti(ctx, metadata); err != nil {
// s.logger.Error("failed to store metadata", zap.Error(err))
// }
}
118 changes: 118 additions & 0 deletions pkg/app/pipedv1/plugin/wait/deployment/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deployment

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/logpersister/logpersistertest"
)

func TestWait_Complete(t *testing.T) {
t.Parallel()

duration := 50 * time.Millisecond

resultCh := make(chan model.StageStatus)
go func() {
result := wait(context.Background(), duration, time.Now(), logpersistertest.NewTestLogPersister(t))
resultCh <- result
}()

// Assert that wait() didn't end before the specified duration has passed.
select {
case <-resultCh:
t.Error("wait() ended too early")
case <-time.After(duration / 10):
}

// Assert that wait() ends after the specified duration has passed.
select {
case result := <-resultCh:
assert.Equal(t, model.StageStatus_STAGE_SUCCESS, result)
case <-time.After(duration):
// Wait 1.1x duration in total to avoid flaky test.
t.Error("wait() did not end even after the specified duration has passed")
}
}

func TestWait_Cancel(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resultCh := make(chan model.StageStatus)
go func() {
result := wait(ctx, 1*time.Second, time.Now(), logpersistertest.NewTestLogPersister(t))
resultCh <- result
}()

cancel()

select {
case result := <-resultCh:
assert.Equal(t, model.StageStatus_STAGE_CANCELLED, result)
case <-time.After(1 * time.Second):
t.Error("wait() did not ended even after the context was canceled")
}
}

func TestWait_RestartAfterLongTime(t *testing.T) {
t.Parallel()
// Suppose this stage started 2 hours ago but it was interrupted.
previousStart := time.Now().Add(-2 * time.Hour)

result := wait(context.Background(), 1*time.Second, previousStart, logpersistertest.NewTestLogPersister(t))
// Immediately return success because the duration has already passed.
assert.Equal(t, model.StageStatus_STAGE_SUCCESS, result)
}

func TestWait_RestartAndContinue(t *testing.T) {
t.Parallel()
// Imagine this timeline:
// begin interrupted now (1)not end (2)end
// | <--------30ms--|--------> | <--10ms--> | <--15s--> |
// | <------------- 50ms ------------------------> |
duration := 50 * time.Millisecond
previousStart := time.Now().Add(-30 * time.Millisecond)

resultCh := make(chan model.StageStatus)
go func() {
result := wait(context.Background(), duration, previousStart, logpersistertest.NewTestLogPersister(t))
resultCh <- result
}()

// (1) Assert that wait() didn't end before the specified duration has passed.
select {
case <-resultCh:
t.Error("wait() ended too early")
case <-time.After(10 * time.Millisecond):
}

// (2) Assert that wait() ends after the specified duration has passed.
select {
case result := <-resultCh:
assert.Equal(t, model.StageStatus_STAGE_SUCCESS, result)
case <-time.After(15 * time.Millisecond): // Not 50ms
// Wait 55ms in total to avoid flaky test.
t.Error("wait() did not end even after the specified duration has passed")
}
}
8 changes: 1 addition & 7 deletions pkg/configv1/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,7 @@ type SkipOptions struct {
Paths []string `json:"paths,omitempty"`
}

// WaitStageOptions contains all configurable values for a WAIT stage.
type WaitStageOptions struct {
Duration Duration `json:"duration"`
SkipOn SkipOptions `json:"skipOn,omitempty"`
}

// WaitStageOptions contains all configurable values for a WAIT_APPROVAL stage.
// WaitApprovalStageOptions contains all configurable values for a WAIT_APPROVAL stage.
type WaitApprovalStageOptions struct {
// The maximum length of time to wait before giving up.
// Defaults to 6h.
Expand Down

0 comments on commit aa454ff

Please sign in to comment.