Skip to content

Commit

Permalink
Optimised restore delete with cancel job flow
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekdwivedi3060 committed Jan 9, 2025
1 parent d218d1f commit 2afc9dd
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 55 deletions.
4 changes: 2 additions & 2 deletions api/v1beta1/aerospikebackup_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/yaml"

"github.com/aerospike/aerospike-backup-service/v2/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v2/pkg/validation"
"github.com/aerospike/aerospike-backup-service/v3/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v3/pkg/validation"
)

func (r *AerospikeBackup) SetupWebhookWithManager(mgr ctrl.Manager) error {
Expand Down
4 changes: 2 additions & 2 deletions api/v1beta1/aerospikebackupservice_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/yaml"

"github.com/aerospike/aerospike-backup-service/v2/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v2/pkg/validation"
"github.com/aerospike/aerospike-backup-service/v3/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v3/pkg/validation"
)

const minSupportedVersion = "3.0.0"
Expand Down
4 changes: 2 additions & 2 deletions api/v1beta1/aerospikerestore_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
"sigs.k8s.io/yaml"

"github.com/aerospike/aerospike-backup-service/v2/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v2/pkg/validation"
"github.com/aerospike/aerospike-backup-service/v3/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v3/pkg/validation"
)

const defaultPollingPeriod time.Duration = 60 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

"github.com/aerospike/aerospike-backup-service/v2/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v3/pkg/dto"
asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1"
lib "github.com/aerospike/aerospike-management-lib"
)
Expand Down
2 changes: 0 additions & 2 deletions config/samples/aerospikebackupservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ spec:
backup-policies:
test-policy:
parallel: 3
remove-files: KeepAll
test-policy1:
parallel: 3
remove-files: KeepAll
storage:
local:
local-storage:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/aerospike/aerospike-kubernetes-operator
go 1.22

require (
github.com/aerospike/aerospike-backup-service/v2 v2.0.1-0.20241205081925-ebbb018935bd
github.com/aerospike/aerospike-backup-service/v3 v3.0.0-20250106155823-c865c44e0dc1
github.com/aerospike/aerospike-client-go/v7 v7.8.0
github.com/aerospike/aerospike-management-lib v1.5.1-0.20250106091653-f0c86baa6cd7
github.com/aerospike/aerospike-management-lib v1.6.0
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/deckarep/golang-set/v2 v2.3.1
github.com/evanphx/json-patch v4.12.0+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
github.com/aerospike/aerospike-backup-service/v2 v2.0.1-0.20241205081925-ebbb018935bd h1:nnFUezmeTYrVn5tREDXISRc3hpRQPDT0n9FE1ZShn8o=
github.com/aerospike/aerospike-backup-service/v2 v2.0.1-0.20241205081925-ebbb018935bd/go.mod h1:g3oSLu/Vj1NSZyzE34Ed4KBaE+S0v4GeLn2frZDMJ5U=
github.com/aerospike/aerospike-backup-service/v3 v3.0.0-20250106155823-c865c44e0dc1 h1:ovj5ibbOvy1wXBljjy7OiaGjTZFQkx8XcgJ8SC9kGYc=
github.com/aerospike/aerospike-backup-service/v3 v3.0.0-20250106155823-c865c44e0dc1/go.mod h1:c4rfQ5WpaJEflNeHFP5/iQuEWH+WAnqZdcweYRgWbfg=
github.com/aerospike/aerospike-client-go/v7 v7.8.0 h1:mKWTf/8sWQkWSYlIR3ZWXZMr9FQQPnIihrA+ujGD+n8=
github.com/aerospike/aerospike-client-go/v7 v7.8.0/go.mod h1:STlBtOkKT8nmp7iD+sEkr/JGEOu+4e2jGlNN0Jiu2a4=
github.com/aerospike/aerospike-management-lib v1.5.1-0.20250106091653-f0c86baa6cd7 h1:lhccxfqnvqdSNnpxdM4xjUnp8AzHsUKKqJPXRKIgfsw=
github.com/aerospike/aerospike-management-lib v1.5.1-0.20250106091653-f0c86baa6cd7/go.mod h1:hsEptY/AmTmHoJnItJNmfJ4yCMG8LIB8YPnIpIyvGXI=
github.com/aerospike/aerospike-management-lib v1.6.0 h1:+bV9bjO9APvF4EOH7+l6608B4rj2Ns8KJVQJ1qqCJsg=
github.com/aerospike/aerospike-management-lib v1.6.0/go.mod h1:hsEptY/AmTmHoJnItJNmfJ4yCMG8LIB8YPnIpIyvGXI=
github.com/aerospike/backup-go v0.3.1 h1:mKEyGl2WrN7UKbDi6ivAEfEdlvh+WS7N0oTYTURxiWs=
github.com/aerospike/backup-go v0.3.1/go.mod h1:JwrUCJEtsUD0iHAs5yZY8+iF56nfw2m/DKvlBey77XE=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/backup-service/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

"github.com/aerospike/aerospike-backup-service/v2/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v2/pkg/validation"
"github.com/aerospike/aerospike-backup-service/v3/pkg/dto"
"github.com/aerospike/aerospike-backup-service/v3/pkg/validation"
asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1"
"github.com/aerospike/aerospike-kubernetes-operator/internal/controller/common"
"github.com/aerospike/aerospike-kubernetes-operator/pkg/utils"
Expand Down
16 changes: 11 additions & 5 deletions internal/controller/backup/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ func (r *SingleBackupReconciler) addFinalizer(finalizerName string) error {
r.aeroBackup.ObjectMeta.Finalizers, finalizerName,
)

if err := r.Client.Update(context.TODO(), r.aeroBackup); err != nil {
return err
}
return r.Client.Update(context.TODO(), r.aeroBackup)
}

return nil
Expand Down Expand Up @@ -267,7 +265,11 @@ func (r *SingleBackupReconciler) removeBackupInfoFromConfigMap() error {
delete(clusterMap, name)
}

backupSvcConfig[asdbv1beta1.AerospikeClustersKey] = clusterMap
if len(clusterMap) == 0 {
delete(backupSvcConfig, asdbv1beta1.AerospikeClustersKey)
} else {
backupSvcConfig[asdbv1beta1.AerospikeClustersKey] = clusterMap
}
}
}

Expand All @@ -279,7 +281,11 @@ func (r *SingleBackupReconciler) removeBackupInfoFromConfigMap() error {
delete(routineMap, routinesToBeDelete[idx])
}

backupSvcConfig[asdbv1beta1.BackupRoutinesKey] = routineMap
if len(routineMap) == 0 {
delete(backupSvcConfig, asdbv1beta1.BackupRoutinesKey)
} else {
backupSvcConfig[asdbv1beta1.BackupRoutinesKey] = routineMap
}
}
}

Expand Down
9 changes: 3 additions & 6 deletions internal/controller/common/backup_config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,9 @@ func ReloadBackupServiceConfigInPods(
return err
}

// TODO:// uncomment this when backup service removes default fields from the GET config API response
// return validateBackupSvcConfigReload(k8sClient, backupServiceClient, log, backupSvc)
log.Info("Reloaded backup service config")

return nil
return validateBackupSvcConfigReload(k8sClient, backupServiceClient, log, backupSvc)
}

//nolint:unused // for future use
func validateBackupSvcConfigReload(k8sClient client.Client,
backupServiceClient *backup_service.Client,
log logr.Logger,
Expand Down Expand Up @@ -143,5 +138,7 @@ func validateBackupSvcConfigReload(k8sClient client.Client,
return fmt.Errorf("backup service config not yet updated in pods")
}

log.Info("Reloaded backup service config")

return nil
}
13 changes: 2 additions & 11 deletions internal/controller/restore/aerospikerestore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand All @@ -34,6 +33,8 @@ import (
"github.com/aerospike/aerospike-kubernetes-operator/internal/controller/common"
)

const finalizerName = "asdb.aerospike.com/restore-finalizer"

// AerospikeRestoreReconciler reconciles a AerospikeRestore object
type AerospikeRestoreReconciler struct {
client.Client
Expand All @@ -58,16 +59,6 @@ func (r *AerospikeRestoreReconciler) Reconcile(_ context.Context, request ctrl.R
aeroRestore := &asdbv1beta1.AerospikeRestore{}
if err := r.Client.Get(context.TODO(), request.NamespacedName, aeroRestore); err != nil {
if errors.IsNotFound(err) {
log.Info("Deleted AerospikeRestore")

aeroRestore.Namespace = request.Namespace
aeroRestore.Name = request.Name
r.Recorder.Eventf(
aeroRestore, corev1.EventTypeNormal, "Deleted",
"Deleted AerospikeRestore %s/%s", aeroRestore.Namespace,
aeroRestore.Name,
)

// Request object not found, could have been deleted after Reconcile request.
return reconcile.Result{}, nil
}
Expand Down
76 changes: 75 additions & 1 deletion internal/controller/restore/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
asdbv1beta1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1beta1"
"github.com/aerospike/aerospike-kubernetes-operator/internal/controller/common"
backup_service "github.com/aerospike/aerospike-kubernetes-operator/pkg/backup-service"
"github.com/aerospike/aerospike-kubernetes-operator/pkg/utils"
)

// SingleRestoreReconciler reconciles a single AerospikeRestore
Expand All @@ -32,7 +33,13 @@ type SingleRestoreReconciler struct {

func (r *SingleRestoreReconciler) Reconcile() (result ctrl.Result, recErr error) {
if !r.aeroRestore.ObjectMeta.DeletionTimestamp.IsZero() {
r.Log.Info("Deleted AerospikeRestore")
r.Log.Info("Deleting AerospikeRestore")

if err := r.cleanUpAndRemoveFinalizer(finalizerName); err != nil {
r.Log.Error(err, "Failed to remove finalizer")
return reconcile.Result{}, err
}

r.Recorder.Eventf(
r.aeroRestore, corev1.EventTypeNormal, "Deleted",
"Deleted AerospikeRestore %s/%s", r.aeroRestore.Namespace,
Expand All @@ -53,6 +60,12 @@ func (r *SingleRestoreReconciler) Reconcile() (result ctrl.Result, recErr error)
return ctrl.Result{}, err
}

// The restore is not being deleted, add finalizer if not added already
if err := r.addFinalizer(finalizerName); err != nil {
r.Log.Error(err, "Failed to add finalizer")
return reconcile.Result{}, err
}

if res := r.reconcileRestore(); !res.IsSuccess {
if res.Err != nil {
r.Log.Error(res.Err, "Failed to reconcile restore")
Expand Down Expand Up @@ -192,6 +205,67 @@ func (r *SingleRestoreReconciler) setStatusPhase(phase asdbv1beta1.AerospikeRest
return nil
}

func (r *SingleRestoreReconciler) addFinalizer(finalizerName string) error {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !utils.ContainsString(
r.aeroRestore.ObjectMeta.Finalizers, finalizerName,
) {
r.aeroRestore.ObjectMeta.Finalizers = append(
r.aeroRestore.ObjectMeta.Finalizers, finalizerName,
)

return r.Client.Update(context.TODO(), r.aeroRestore)
}

return nil
}

func (r *SingleRestoreReconciler) cleanUpAndRemoveFinalizer(finalizerName string) error {
if utils.ContainsString(r.aeroRestore.ObjectMeta.Finalizers, finalizerName) {
r.Log.Info("Removing finalizer")

if r.aeroRestore.Status.JobID != nil {
if err := r.cancelRestoreJob(); err != nil {
return err
}
}

// Remove finalizer from the list
r.aeroRestore.ObjectMeta.Finalizers = utils.RemoveString(
r.aeroRestore.ObjectMeta.Finalizers, finalizerName,
)

if err := r.Client.Update(context.TODO(), r.aeroRestore); err != nil {
return err
}

r.Log.Info("Removed finalizer")
}

return nil
}

func (r *SingleRestoreReconciler) cancelRestoreJob() error {
serviceClient, err := backup_service.GetBackupServiceClient(r.Client, &r.aeroRestore.Spec.BackupService)
if err != nil {
return err
}

if statusCode, err := serviceClient.CancelRestoreJob(r.aeroRestore.Status.JobID); err != nil {
if statusCode == http.StatusNotFound {
r.Log.Info("Restore job not found, skipping cancel")
return nil
}

return err
}

r.Log.Info("Restore job cancelled successfully")

return nil
}

func statusToPhase(status string) asdbv1beta1.AerospikeRestorePhase {
switch status {
case "Done":
Expand Down
37 changes: 30 additions & 7 deletions pkg/backup-service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

const restAPIVersion = "v1"
const defaultContextPath = "/"
const contentTypeJSON = "application/json"

type Client struct {
// The address to listen on.
Expand Down Expand Up @@ -117,7 +118,7 @@ func (c *Client) GetBackupServiceConfig() (map[string]interface{}, error) {
func (c *Client) ApplyConfig() error {
url := c.API("/config/apply")

resp, err := http.Post(url, "application/json", nil)
resp, err := http.Post(url, contentTypeJSON, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -239,7 +240,7 @@ func (c *Client) AddCluster(name, cluster interface{}) error {

bodyReader := bytes.NewReader(jsonBody)

resp, err := http.Post(url, "application/json", bodyReader)
resp, err := http.Post(url, contentTypeJSON, bodyReader)
if err != nil {
return err
}
Expand Down Expand Up @@ -332,7 +333,7 @@ func (c *Client) AddBackupPolicy(name string, policy interface{}) error {

bodyReader := bytes.NewReader(jsonBody)

resp, err := http.Post(url, "application/json", bodyReader)
resp, err := http.Post(url, contentTypeJSON, bodyReader)
if err != nil {
return err
}
Expand Down Expand Up @@ -399,7 +400,7 @@ func (c *Client) AddBackupRoutine(name string, routine interface{}) error {

bodyReader := bytes.NewReader(jsonBody)

resp, err := http.Post(url, "application/json", bodyReader)
resp, err := http.Post(url, contentTypeJSON, bodyReader)
if err != nil {
return err
}
Expand Down Expand Up @@ -521,7 +522,7 @@ func (c *Client) AddStorage(name string, storage interface{}) error {

bodyReader := bytes.NewReader(jsonBody)

resp, err := http.Post(url, "application/json", bodyReader)
resp, err := http.Post(url, contentTypeJSON, bodyReader)
if err != nil {
return err
}
Expand Down Expand Up @@ -608,7 +609,7 @@ func (c *Client) ScheduleBackup(routineName string, delay metav1.Duration) error
url.RawQuery = query.Encode()
}

resp, err := http.Post(url.String(), "application/json", nil)
resp, err := http.Post(url.String(), contentTypeJSON, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -649,7 +650,7 @@ func (c *Client) TriggerRestoreWithType(log logr.Logger, restoreType string,

bodyReader := bytes.NewReader(jsonBody)

resp, err := http.Post(url, "application/json", bodyReader)
resp, err := http.Post(url, contentTypeJSON, bodyReader)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -712,6 +713,28 @@ func (c *Client) CheckRestoreStatus(jobID *int64) (map[string]interface{}, error
return restoreStatus, nil
}

func (c *Client) CancelRestoreJob(jobID *int64) (int, error) {
url := c.API(fmt.Sprintf("/restore/cancel/%d", *jobID))

resp, err := http.Post(url, contentTypeJSON, nil)
if err != nil {
return 0, err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}

return resp.StatusCode, fmt.Errorf("failed to delete restore job, error: %s", string(body))
}

return resp.StatusCode, nil
}

func (c *Client) API(pattern string) string {
contextPath := c.getContextPath()

Expand Down
Loading

0 comments on commit 2afc9dd

Please sign in to comment.