Skip to content

Commit

Permalink
Merge pull request #935 from abstractmj/fea-mesos
Browse files Browse the repository at this point in the history
feature: bcs-mesos support taskgroup update resource without restart #932
  • Loading branch information
DeveloperJim authored Jun 28, 2021
2 parents 155e4eb + 95520c2 commit 1b3f5b3
Show file tree
Hide file tree
Showing 27 changed files with 883 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/schetypes"
)

// CreateDeployment create deployment, call scheduler create deployment api
func (s *Scheduler) CreateDeployment(body []byte) (string, error) {
blog.Info("create deployment. param(%s)", string(body))
var param bcstype.BcsDeployment
Expand Down Expand Up @@ -73,7 +74,8 @@ func (s *Scheduler) CreateDeployment(body []byte) (string, error) {
return string(reply), nil
}

func (s *Scheduler) UpdateDeployment(body []byte) (string, error) {
// UpdateDeployment do update deployment, call scheduler update deployment api
func (s *Scheduler) UpdateDeployment(body []byte, args string) (string, error) {
blog.Info("udpate deployment. param(%s)", string(body))
var param bcstype.BcsDeployment

Expand Down Expand Up @@ -110,7 +112,7 @@ func (s *Scheduler) UpdateDeployment(body []byte) (string, error) {
name := deploymentDef.ObjectMeta.Name
namespace := deploymentDef.ObjectMeta.NameSpace

url := fmt.Sprintf("%s/v1/deployment/%s/%s", s.GetHost(), namespace, name)
url := fmt.Sprintf("%s/v1/deployment/%s/%s?args=%s", s.GetHost(), namespace, name, args)
blog.Info("post a request to url(%s), request:%s", url, string(data))

reply, err := s.client.PUT(url, nil, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,8 @@ func (s *Scheduler) udpateDeploymentHandler(req *restful.Request, resp *restful.
return
}

reply, err := s.UpdateDeployment(body)
args := req.QueryParameter("args")
reply, err := s.UpdateDeployment(body, args)
if err != nil {
blog.Error("fail to create deployment. reply(%s), err(%s)", reply, err.Error())
resp.Write([]byte(err.Error()))
Expand Down
6 changes: 3 additions & 3 deletions bcs-mesos/bcs-scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler
go 1.14

replace (
github.com/Tencent/bk-bcs/bcs-common => github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210517123645-82ef0026bf95
github.com/Tencent/bk-bcs/bcs-common => github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210625040556-0385f88cbfd6
github.com/Tencent/bk-bcs/bcs-mesos/kubebkbcsv2 => ../kubebkbcsv2
github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
go.etcd.io/bbolt v1.3.4 => github.com/coreos/bbolt v1.3.4
Expand All @@ -15,15 +15,15 @@ require (
github.com/andygrunwald/megos v0.0.0-20180424065632-0fccaea93714
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964
github.com/emicklei/go-restful v2.15.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/golang/protobuf v1.5.2
github.com/parnurzeal/gorequest v0.2.16
github.com/prometheus/client_golang v1.10.0
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
k8s.io/api v0.18.16
k8s.io/apiextensions-apiserver v0.18.16
k8s.io/apimachinery v0.18.16
k8s.io/client-go v0.18.16
moul.io/http2curl v1.0.0 // indirect
)
25 changes: 22 additions & 3 deletions bcs-mesos/bcs-scheduler/src/manager/sched/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
commtypes "github.com/Tencent/bk-bcs/bcs-common/common/types"
"github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/schetypes"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/scheduler"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/utils"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"github.com/emicklei/go-restful"
)
Expand Down Expand Up @@ -372,6 +373,8 @@ func (r *Router) updateDeployment(req *restful.Request, resp *restful.Response)
}
blog.V(3).Infof("recv update deployment request")

args := req.QueryParameter("args")

var deploymentDef types.DeploymentDef
decoder := json.NewDecoder(req.Request.Body)
if err := decoder.Decode(&deploymentDef); err != nil {
Expand All @@ -388,7 +391,14 @@ func (r *Router) updateDeployment(req *restful.Request, resp *restful.Response)
deploymentDef.ObjectMeta.NameSpace, deploymentDef.ObjectMeta.Name)
}

if errCode, err := r.backend.UpdateDeployment(&deploymentDef); err != nil {
var errCode int
var err error
if args == "resource" {
errCode, err = r.backend.UpdateDeploymentResource(&deploymentDef)
} else {
errCode, err = r.backend.UpdateDeployment(&deploymentDef)
}
if err != nil {
data := createResponseDataV2(errCode, err.Error(), nil)
resp.Write([]byte(data))
return
Expand Down Expand Up @@ -537,8 +547,9 @@ func (r *Router) getCurrentOffers(req *restful.Request, resp *restful.Response)

blog.V(3).Info("request get current offers request")

res := r.backend.GetCurrentOffers()
data := createResponseData(nil, "", res)
// for forward compatibility, just add delta resource to origin mesos offer
offers := r.backend.GetCurrentOffers()
data := createResponseData(nil, "", offers)
resp.Write([]byte(data))
blog.Info("request get current offers request finish")
}
Expand Down Expand Up @@ -1349,6 +1360,14 @@ func (r *Router) updateApplication(req *restful.Request, resp *restful.Response)
instanceNum, err = strconv.ParseUint(instances, 10, 64)
if args == "resource" {
blog.Infof("request update application(%s.%s) resource", runAs, appId)
// check if there are only changes about resource
err = utils.IsOnlyResourceIncreased(currVersion, &version)
if err != nil {
blog.Error("request update application(%s.%s) parameter err version, err %s", runAs, appId, err.Error())
data := createResponseData(err, err.Error(), nil)
resp.Write([]byte(data))
return
}
} else {
if err != nil {
blog.Error("request update application(%s.%s) parameter err instances(%s)", runAs, appId, instances)
Expand Down
8 changes: 5 additions & 3 deletions bcs-mesos/bcs-scheduler/src/manager/sched/backend/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ package backend

import (
commtypes "github.com/Tencent/bk-bcs/bcs-common/common/types"
"github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/mesosproto/mesos"
types "github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/schetypes"
)

// GetClusterResources get cluster resources
func (b *backend) GetClusterResources() (*commtypes.BcsClusterResource, error) {

return b.sched.GetClusterResource()
}

// GetClusterEndpoints get cluster endpoints
func (b *backend) GetClusterEndpoints() *commtypes.ClusterEndpoints {
endpoints := new(commtypes.ClusterEndpoints)

Expand All @@ -36,6 +37,7 @@ func (b *backend) GetClusterEndpoints() *commtypes.ClusterEndpoints {
return endpoints
}

func (b *backend) GetCurrentOffers() []*mesos.Offer {
// GetCurrentOffers get current offers of cluster
func (b *backend) GetCurrentOffers() ([]*types.OfferWithDelta) {
return b.sched.GetCurrentOffers()
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (b *backend) LaunchApplication(version *types.Version) error {
TransactionID: types.GenerateTransactionID(string(commontypes.BcsDataType_APP)),
CreateTime: time.Now(),
CheckInterval: time.Second,
CurOp: &types.TransactaionOperartion{
CurOp: &types.TransactionOperartion{
OpType: types.TransactionOpTypeLaunch,
OpLaunchData: &types.TransAPILaunchOpdata{
Version: version,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (b *backend) RecoverApplication(version *types.Version) error {
TransactionID: types.GenerateTransactionID(string(commontypes.BcsDataType_APP)),
CreateTime: time.Now(),
CheckInterval: time.Second,
CurOp: &types.TransactaionOperartion{
CurOp: &types.TransactionOperartion{
OpType: types.TransactionOpTypeLaunch,
OpLaunchData: &types.TransAPILaunchOpdata{
Version: version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (b *backend) DeleteApplication(runAs, appID string, enforce bool, kind comm
TransactionID: types.GenerateTransactionID(string(commontypes.BcsDataType_APP)),
CreateTime: time.Now(),
CheckInterval: 3 * time.Second,
CurOp: &types.TransactaionOperartion{
CurOp: &types.TransactionOperartion{
OpType: types.TransactionOpTypeDelete,
OpDeleteData: &types.TransAPIDeleteOpdata{
Enforce: enforce,
Expand Down
127 changes: 124 additions & 3 deletions bcs-mesos/bcs-scheduler/src/manager/sched/backend/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ package backend
import (
"errors"
"fmt"
"reflect"
"strconv"
"time"

comm "github.com/Tencent/bk-bcs/bcs-common/common"
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
commtypes "github.com/Tencent/bk-bcs/bcs-common/common/types"
"github.com/Tencent/bk-bcs/bcs-common/pkg/scheduler/schetypes"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/utils"
"github.com/Tencent/bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"strconv"
"time"
)

// GetDeployment get deployment by namespace and name
func (b *backend) GetDeployment(ns string, name string) (*types.Deployment, error) {
return b.store.FetchDeployment(ns, name)
}

// CreateDeployment create deployment
func (b *backend) CreateDeployment(deploymentDef *types.DeploymentDef) (int, error) {
ns := deploymentDef.ObjectMeta.NameSpace
name := deploymentDef.ObjectMeta.Name
Expand Down Expand Up @@ -255,6 +260,7 @@ func (b *backend) createDeploymentApplication(version *types.Version) (int, erro
return comm.BcsSuccess, nil
}

// UpdateDeployment update deployment
func (b *backend) UpdateDeployment(deployment *types.DeploymentDef) (int, error) {
ns := deployment.ObjectMeta.NameSpace
name := deployment.ObjectMeta.Name
Expand Down Expand Up @@ -475,6 +481,7 @@ func (b *backend) UpdateDeployment(deployment *types.DeploymentDef) (int, error)
return comm.BcsSuccess, nil
}

// CancelUpdateDeployment cancel deployment update process
func (b *backend) CancelUpdateDeployment(ns string, name string) error {
blog.Info("request cancelupdate deployment(%s.%s) begin", ns, name)
b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
Expand Down Expand Up @@ -568,6 +575,7 @@ func (b *backend) CancelUpdateDeployment(ns string, name string) error {
return nil
}

// DeleteDeployment do delete deployment
func (b *backend) DeleteDeployment(ns string, name string, enforce bool) (int, error) {
blog.Info("request delete deployment(%s.%s) begin", ns, name)
b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
Expand Down Expand Up @@ -623,8 +631,8 @@ func (b *backend) DeleteDeployment(ns string, name string, enforce bool) (int, e
return comm.BcsSuccess, nil
}

// CheckDeleteDeployment check deployment deletion
func (b *backend) CheckDeleteDeployment(ns string, name string) {

blog.Infof("check delete deployment(%s.%s)", ns, name)

b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
Expand Down Expand Up @@ -677,6 +685,7 @@ func (b *backend) CheckDeleteDeployment(ns string, name string) {
return
}

// PauseUpdateDeployment pause deployment update process
func (b *backend) PauseUpdateDeployment(ns string, name string) error {
blog.Info("request pauseupdate deployment(%s.%s) begin", ns, name)
b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
Expand Down Expand Up @@ -710,6 +719,7 @@ func (b *backend) PauseUpdateDeployment(ns string, name string) error {
return nil
}

// ResumeUpdateDeployment resume deployment update process
func (b *backend) ResumeUpdateDeployment(ns string, name string) error {
blog.Info("request resumeupdate deployment(%s.%s) begin", ns, name)
b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
Expand Down Expand Up @@ -744,6 +754,7 @@ func (b *backend) ResumeUpdateDeployment(ns string, name string) error {
return nil
}

// ScaleDeployment scale deployment to certain instances
func (b *backend) ScaleDeployment(runAs, name string, instances uint64) error {
blog.Info("request scale deployment(%s.%s) to instances %d", runAs, name, instances)
b.store.LockDeployment(fmt.Sprintf("%s.%s", runAs, name))
Expand Down Expand Up @@ -774,3 +785,113 @@ func (b *backend) ScaleDeployment(runAs, name string, instances uint64) error {

return b.ScaleApplication(runAs, deployment.Application.ApplicationName, instances, "", false)
}

// UpdateDeploymentResource update deployment resource only
func (b *backend) UpdateDeploymentResource(deployment *types.DeploymentDef) (int, error) {
ns := deployment.ObjectMeta.NameSpace
name := deployment.ObjectMeta.Name
blog.Infof("request update deployment (%s.%s) resource begin", ns, name)

b.store.LockDeployment(fmt.Sprintf("%s.%s", ns, name))
defer b.store.UnLockDeployment(fmt.Sprintf("%s.%s", ns, name))

// when current deployment is not found or status is not running, it cannot be updated
currDeployment, err := b.store.FetchDeployment(ns, name)
if err != nil && err != store.ErrNoFound {
blog.Error("update deployment(%s.%s) resource, fetch deployment err: %s", ns, name, err.Error())
return comm.BcsErrCommGetZkNodeFail, err
}
if currDeployment == nil {
err = errors.New("deployment not exist")
blog.Warn("update deployment(%s.%s) resource: data not exist", ns, name)
return comm.BcsErrMesosSchedNotFound, err
}
if currDeployment.Status != types.DEPLOYMENT_STATUS_RUNNING {
err = errors.New("deployment is not running, cannot update resource")
blog.Warn("update deployment(%s.%s) resource: status(%s) is not running", ns, name, currDeployment.Status)
return comm.BcsErrMesosSchedCommon, err
}

// check deployment differences
if deployment.ObjectMeta.Name != currDeployment.ObjectMeta.Name ||
deployment.ObjectMeta.NameSpace != currDeployment.ObjectMeta.NameSpace ||
!reflect.DeepEqual(deployment.Selector, currDeployment.Selector) ||
!reflect.DeepEqual(deployment.Strategy, currDeployment.Strategy) {
err = errors.New("cannot change deployment meta and strategy when update resource")
blog.Warnf("update deployment(%s.%s) resource: meta data changed", ns, name, currDeployment.Status)
return comm.BcsErrMesosSchedCommon, err
}

// lock current application
b.store.LockApplication(ns + "." + currDeployment.Application.ApplicationName)
defer b.store.UnLockApplication(ns + "." + currDeployment.Application.ApplicationName)

app, err := b.store.FetchApplication(ns, currDeployment.Application.ApplicationName)
if err != nil && err != store.ErrNoFound {
blog.Warnf("update deployment(%s.%s) resource, fetch application(%s) err:%s",
ns, name, currDeployment.Application.ApplicationName, err.Error())
return comm.BcsErrCommGetZkNodeFail, err
}
if app.Status == types.APP_STATUS_OPERATING || app.Status == types.APP_STATUS_ROLLINGUPDATE {
blog.Warnf("application(%s.%s) of deployment(%s.%s) cannot do update under status(%s)",
ns, currDeployment.Application.ApplicationName, ns, name, app.Status)
return comm.BcsErrMesosSchedCommon, fmt.Errorf(
"application(%s.%s) of deployment(%s.%s) cannot do update under status(%s)",
ns, currDeployment.Application.ApplicationName, ns, name, app.Status)
}
currVersion, _ := b.GetVersion(ns, currDeployment.Application.ApplicationName)
if currVersion == nil {
blog.Warnf("update deployment(%s.%s) resource failed, err get current version", ns, name)
return comm.BcsErrCommGetZkNodeFail, err
}

err = utils.IsOnlyResourceIncreased(currVersion, deployment.Version)
if err != nil {
blog.Warnf("check update resource failed, err %s", err.Error())
return comm.BcsErrMesosSchedCommon, fmt.Errorf("check update resource failed, err %s", err.Error())
}

// launch update transaction
updateTrans := &types.Transaction{
TransactionID: types.GenerateTransactionID(string(commtypes.BcsDataType_DEPLOYMENT)),
ObjectKind: string(commtypes.BcsDataType_DEPLOYMENT),
ObjectName: name,
Namespace: ns,
CreateTime: time.Now(),
CheckInterval: 3 * time.Second,
CurOp: &types.TransactionOperartion{
OpType: types.TransactionOpTypeDepUpdateResource,
OpDepUpdateData: &types.TransDeploymentUpdateOpData{
Version: deployment.Version,
IsUpdateResource: true,
},
},
Status: types.OPERATION_STATUS_INIT,
}

// set deployment status
currDeployment.Status = types.DEPLOYMENT_STATUS_UPDATERESOURCE
currDeployment.RawJsonBackup = currDeployment.RawJson
currDeployment.RawJson = deployment.RawJson
if err = b.store.SaveDeployment(currDeployment); err != nil {
blog.Errorf("update deployment(%s.%s) to status %s failed, err %s",
ns, name, types.DEPLOYMENT_STATUS_UPDATERESOURCE, err.Error())
return comm.BcsErrMesosSchedCommon, fmt.Errorf("update deployment(%s.%s) to status %s failed, err %s",
ns, name, types.DEPLOYMENT_STATUS_UPDATERESOURCE, err.Error())
}

// save version
if err = b.store.SaveVersion(deployment.Version); err != nil {
return comm.BcsErrMesosSchedCommon, fmt.Errorf(
"save version(%s.%s) failed when UpdateDeploymentResource, err %s", ns, name, err.Error())
}

if err := b.store.SaveTransaction(updateTrans); err != nil {
blog.Errorf("save transaction(%s,%s) into db failed, err %s", ns, name, err.Error())
return comm.BcsErrMesosSchedCommon, fmt.Errorf(
"save transaction(%s,%s) into db failed, err %s", ns, name, err.Error())
}
b.sched.PushEventQueue(updateTrans)
blog.Infof("request update resource of deployment(%s.%s) end", ns, name)
return comm.BcsSuccess, nil
}
Loading

0 comments on commit 1b3f5b3

Please sign in to comment.