Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-service: add StopBackup&StopRestore api for http-service #5374

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/http-service/idl/api/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,13 @@ message StopBackupReq {
string backup_id = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The unique ID of the backup."}];
}

message StopBackupResp {}
message StopBackupResp {
bool success = 1 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "Whether the request is ssuccessful.",
example: "true"
}];
optional string message = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The message of the response."}];
}

message StopRestoreReq {
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_schema) = {
Expand All @@ -1084,7 +1090,13 @@ message StopRestoreReq {
string restore_id = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The unique ID of the restore."}];
}

message StopRestoreResp {}
message StopRestoreResp {
bool success = 1 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
description: "Whether the request is ssuccessful.",
example: "true"
}];
optional string message = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {description: "The message of the response."}];
}

message DeleteBackupReq {
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_schema) = {
Expand Down
429 changes: 242 additions & 187 deletions cmd/http-service/pbgen/api/service.pb.go

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions cmd/http-service/pbgen/oas/openapi-spec.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1239,10 +1239,32 @@
"title": "ResumeClusterResp"
},
"apiStopBackupResp": {
"type": "object"
"type": "object",
"properties": {
"success": {
"type": "boolean",
"example": true,
"description": "Whether the request is ssuccessful."
},
"message": {
"type": "string",
"description": "The message of the response."
}
}
},
"apiStopRestoreResp": {
"type": "object"
"type": "object",
"properties": {
"success": {
"type": "boolean",
"example": true,
"description": "Whether the request is ssuccessful."
},
"message": {
"type": "string",
"description": "The message of the response."
}
}
},
"apiTiDBMember": {
"type": "object",
Expand Down
151 changes: 149 additions & 2 deletions cmd/http-service/server/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,21 @@ func (s *ClusterServer) GetBackup(ctx context.Context, req *api.GetBackupReq) (*
}

info := convertToBackupInfo(backup)
// get job status
kubeCli := s.KubeClient.GetKubeClient(k8sID)
_, err = kubeCli.BatchV1().Jobs(backup.GetNamespace()).Get(ctx, backup.GetBackupJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
info.Status = "Stopped"
logger.Info("Backup Job not found, Backup Job is Stopped.", zap.Error(err))
} else {
logger.Error("Get backup job failed", zap.Error(err))
message := fmt.Sprintf("Get backup job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.GetBackupResp{Success: false, Message: &message}, nil
}
}

return &api.GetBackupResp{Success: true, Data: info}, nil
}

Expand Down Expand Up @@ -409,6 +424,21 @@ func (s *ClusterServer) GetRestore(ctx context.Context, req *api.GetRestoreReq)
}

info := convertToRestoreInfo(restore)
// get job status
kubeCli := s.KubeClient.GetKubeClient(k8sID)
_, err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Get(ctx, restore.GetRestoreJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
info.Status = "Stopped"
logger.Info("Restore Job not found, Restore Job is Stopped.", zap.Error(err))
} else {
logger.Error("Get Restore job failed", zap.Error(err))
message := fmt.Sprintf("Get restore job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.GetRestoreResp{Success: false, Message: &message}, nil
}
}

return &api.GetRestoreResp{Success: true, Data: info}, nil
}

Expand All @@ -431,11 +461,128 @@ func convertToRestoreInfo(restore *v1alpha1.Restore) *api.RestoreInfo {
}

func (s *ClusterServer) StopBackup(ctx context.Context, req *api.StopBackupReq) (*api.StopBackupResp, error) {
return nil, errors.New("StopBackup not implemented")
k8sID := getKubernetesID(ctx)
opCli := s.KubeClient.GetOperatorClient(k8sID)
kubeCli := s.KubeClient.GetKubeClient(k8sID)
logger := log.L().With(zap.String("request", "StopBackup"), zap.String("k8sID", k8sID),
zap.String("clusterID", req.ClusterId), zap.String("backupID", req.BackupId))
if opCli == nil || kubeCli == nil {
logger.Error("K8s client not found")
message := fmt.Sprintf("no %s is specified in the request header or the kubeconfig context not exists", HeaderKeyKubernetesID)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

// check whether the backup exists
backup, err := opCli.PingcapV1alpha1().Backups(req.ClusterId).Get(ctx, req.BackupId, metav1.GetOptions{})
if err != nil {
logger.Error("Backup not found", zap.Error(err))
message := fmt.Sprintf("Backup %s not found", req.BackupId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

// stop backup
if backup.Spec.Mode == v1alpha1.BackupModeLog {
backup.Spec.LogStop = true
_, err = opCli.PingcapV1alpha1().Backups(req.ClusterId).Update(ctx, backup, metav1.UpdateOptions{})
if err != nil {
logger.Error("Stop log backup failed", zap.Error(err))
message := fmt.Sprintf("Stop log backup failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
} else {
_, err := kubeCli.BatchV1().Jobs(backup.GetNamespace()).Get(ctx, backup.GetBackupJobName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Warn("Backup is already Stopped", zap.Error(err))
message := fmt.Sprintf("Backup %s is already Stopped", req.BackupId)
setResponseStatusCodes(ctx, http.StatusNotFound)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
logger.Error("Get backup job failed", zap.Error(err))
message := fmt.Sprintf("Get backup job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}

err = kubeCli.BatchV1().Jobs(backup.GetNamespace()).Delete(ctx, backup.GetBackupJobName(), metav1.DeleteOptions{})
if err != nil {
logger.Error("Stop backup failed", zap.Error(err))
message := fmt.Sprintf("Stop backup failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
}

// update backup status
_, err = opCli.PingcapV1alpha1().Backups(req.ClusterId).Update(ctx, backup, metav1.UpdateOptions{})
if err != nil {
logger.Error("Backup not found", zap.Error(err))
message := fmt.Sprintf("Backup %s not found", req.BackupId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopBackupResp{Success: false, Message: &message}, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these lines? and also for // update restore status

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


return &api.StopBackupResp{Success: true}, nil
}

func (s *ClusterServer) StopRestore(ctx context.Context, req *api.StopRestoreReq) (*api.StopRestoreResp, error) {
return nil, errors.New("StopRestore not implemented")
k8sID := getKubernetesID(ctx)
opCli := s.KubeClient.GetOperatorClient(k8sID)
kubeCli := s.KubeClient.GetKubeClient(k8sID)
logger := log.L().With(zap.String("request", "StopRestore"), zap.String("k8sID", k8sID),
zap.String("clusterID", req.ClusterId), zap.String("storeID", req.RestoreId))
if opCli == nil || kubeCli == nil {
logger.Error("K8s client not found")
message := fmt.Sprintf("no %s is specified in the request header or the kubeconfig context not exists", HeaderKeyKubernetesID)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// check whether the restore exists
restore, err := opCli.PingcapV1alpha1().Restores(req.ClusterId).Get(ctx, req.RestoreId, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
logger.Warn("Restore is already Stopped", zap.Error(err))
message := fmt.Sprintf("Restore %s is already Stopped", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusNotFound)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}
logger.Error("Restore not found", zap.Error(err))
message := fmt.Sprintf("Restore %s not found", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// stop restore
_, err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Get(ctx, restore.GetRestoreJobName(), metav1.GetOptions{})
if err != nil {
logger.Error("Get restore job failed", zap.Error(err))
message := fmt.Sprintf("Get restore job failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

err = kubeCli.BatchV1().Jobs(restore.GetNamespace()).Delete(ctx, restore.GetRestoreJobName(), metav1.DeleteOptions{})
if err != nil {
logger.Error("Stop restore failed", zap.Error(err))
message := fmt.Sprintf("Stop restore failed: %s", err.Error())
setResponseStatusCodes(ctx, http.StatusInternalServerError)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

// update restore status
_, err = opCli.PingcapV1alpha1().Restores(req.ClusterId).Update(ctx, restore, metav1.UpdateOptions{})
if err != nil {
logger.Error("Restore not found", zap.Error(err))
message := fmt.Sprintf("Restore %s not found", req.RestoreId)
setResponseStatusCodes(ctx, http.StatusBadRequest)
return &api.StopRestoreResp{Success: false, Message: &message}, nil
}

return &api.StopRestoreResp{Success: true}, nil
}

func (s *ClusterServer) DeleteBackup(ctx context.Context, req *api.DeleteBackupReq) (*api.DeleteBackupResp, error) {
Expand Down
16 changes: 8 additions & 8 deletions cmd/http-service/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ import (
type ClusterStatus string

const (
// the cluster is still being created
// ClusterStatusCreating the cluster is still being created
ClusterStatusCreating ClusterStatus = "creating"
// all components are running
// ClusterStatusRunning all components are running
ClusterStatusRunning ClusterStatus = "running"
// some components are deleting
// ClusterStatusDeleting some components are deleting
ClusterStatusDeleting ClusterStatus = "deleting"
// some components are scaling
// ClusterStatusScaling some components are scaling
ClusterStatusScaling ClusterStatus = "scaling"
// some components are upgrading
// ClusterStatusUpgrading some components are upgrading
ClusterStatusUpgrading ClusterStatus = "upgrading"
// some components are unavailable
// ClusterStatusUnavailable some components are unavailable
ClusterStatusUnavailable ClusterStatus = "unavailable"

helperImage = "busybox:1.36"
Expand Down Expand Up @@ -860,7 +860,7 @@ func convertToClusterInfo(logger *zap.Logger, kubeCli kubernetes.Interface, tc *
Name: strings.TrimPrefix(member.PodName, namePrefix),
Id: id,
StartTime: getPodStartTime(podList, member.PodName),
State: string(member.State),
State: member.State,
})
}

Expand Down Expand Up @@ -900,7 +900,7 @@ func convertToClusterInfo(logger *zap.Logger, kubeCli kubernetes.Interface, tc *
Name: strings.TrimPrefix(member.PodName, namePrefix),
Id: id,
StartTime: getPodStartTime(podList, member.PodName),
State: string(member.State),
State: member.State,
})
if member.State == v1alpha1.TiKVStateUp {
tiflashReadyCount++
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pingcap/v1alpha1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NeedNotClean(backup *Backup) bool {
return backup.Spec.CleanPolicy == CleanPolicyTypeOnFailure && !IsBackupFailed(backup)
}

// ParseLogBackupSubCommand parse the log backup subcommand from cr.
// ParseLogBackupSubcommand parse the log backup subcommand from cr.
// The parse priority of the command is stop > truncate > start.
func ParseLogBackupSubcommand(backup *Backup) LogSubCommandType {
if backup.Spec.Mode != BackupModeLog {
Expand Down Expand Up @@ -382,7 +382,7 @@ func IsLogBackupAlreadyTruncate(backup *Backup) bool {
return specTS <= startCommitTS || specTS <= successedTS
}

// IsLogBackupAlreadyStop return whether log backup has already stoped.
// IsLogBackupAlreadyStop return whether log backup has already stopped.
func IsLogBackupAlreadyStop(backup *Backup) bool {
return backup.Spec.Mode == BackupModeLog && backup.Status.Phase == BackupStopped
}
8 changes: 4 additions & 4 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type PDClient interface {
GetTombStoneStores() (*StoresInfo, error)
// GetStore gets a TiKV store for a specific store id from cluster
GetStore(storeID uint64) (*StoreInfo, error)
// storeLabelsEqualNodeLabels compares store labels with node labels
// SetStoreLabels compares store labels with node labels
// for historic reasons, PD stores TiKV labels as []*StoreLabel which is a key-value pair slice
SetStoreLabels(storeID uint64, labels map[string]string) (bool, error)
// UpdateReplicationConfig updates the replication config
Expand Down Expand Up @@ -196,7 +196,7 @@ type MembersInfo struct {

// below copied from github.com/tikv/pd/pkg/autoscaling

// Strategy within a HTTP request provides rules and resources to help make decision for auto scaling.
// Strategy within an HTTP request provides rules and resources to help make decision for auto scaling.
type Strategy struct {
Rules []*Rule `json:"rules"`
Resources []*Resource `json:"resources"`
Expand Down Expand Up @@ -258,7 +258,7 @@ func (c *pdClient) GetHealth() (*HealthInfo, error) {
if err != nil {
return nil, err
}
healths := []MemberHealth{}
var healths []MemberHealth
err = json.Unmarshal(body, &healths)
if err != nil {
return nil, err
Expand Down Expand Up @@ -378,7 +378,7 @@ func (c *pdClient) DeleteStore(storeID uint64) error {
}
defer httputil.DeferClose(res.Body)

// Remove an offline store should returns http.StatusOK
// Remove an offline store should return http.StatusOK
if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusNotFound {
return nil
}
Expand Down
Loading