Skip to content

Commit

Permalink
add validation for migration
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-sun-star committed Mar 12, 2024
1 parent 714422a commit 3c61ca3
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 67 deletions.
5 changes: 5 additions & 0 deletions api/v1alpha1/obcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ func (r *OBCluster) validateMutation() error {
} else if r.Spec.Topology[0].Replica != 1 {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("topology"), r.Spec.Topology, "standalone mode only support single replica"))
}
// validate migration
migrateAnnoVal, migrateAnnoExist := r.GetAnnotations()[oceanbaseconst.AnnotationsSourceClusterAddress]
if migrateAnnoExist {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("annotations").Child(oceanbaseconst.AnnotationsSourceClusterAddress), migrateAnnoVal, "migrate obcluster into standalone mode is not supported"))
}
}

// Validate userSecrets
Expand Down
7 changes: 6 additions & 1 deletion internal/const/oceanbase/oceanbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const (
AnnotationsIndependentPVCLifecycle = "oceanbase.oceanbase.com/independent-pvc-lifecycle"
AnnotationsSinglePVC = "oceanbase.oceanbase.com/single-pvc"
AnnotationsMode = "oceanbase.oceanbase.com/mode"
AnnotationsSourceClusterConnection = "oceanbase.oceanbase.com/source-cluster-address"
AnnotationsSourceClusterAddress = "oceanbase.oceanbase.com/source-cluster-address"
)

const (
Expand Down Expand Up @@ -175,3 +175,8 @@ const (
const (
TolerateServerPodNotReadyMinutes = 5
)

const (
ClusterNameParam = "cluster"
ClusterIdParam = "cluster_id"
)
1 change: 1 addition & 0 deletions internal/resource/obcluster/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (

// obcluster tasks
const (
tCheckMigration ttypes.TaskName = "check before migration"
tCheckImageReady ttypes.TaskName = "check image ready"
tCheckClusterMode ttypes.TaskName = "check cluster mode"
tCheckAndCreateUserSecrets ttypes.TaskName = "check and create user secrets"
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/obcluster/obcluster_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func MigrateOBClusterFromExisting() *tasktypes.TaskFlow {
return &tasktypes.TaskFlow{
OperationContext: &tasktypes.OperationContext{
Name: fMigrateOBClusterFromExisting,
Tasks: []tasktypes.TaskName{tCheckImageReady, tCheckClusterMode, tCheckAndCreateUserSecrets, tCreateOBZone, tWaitOBZoneRunning},
Tasks: []tasktypes.TaskName{tCheckMigration, tCheckImageReady, tCheckClusterMode, tCheckAndCreateUserSecrets, tCreateOBZone, tWaitOBZoneRunning, tCreateUsers, tMaintainOBParameter, tCreateServiceForMonitor, tCreateOBClusterService},
TargetStatus: clusterstatus.Running,
OnFailure: tasktypes.FailureRule{
NextTryStatus: clusterstatus.Failed,
Expand Down
4 changes: 3 additions & 1 deletion internal/resource/obcluster/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (m *OBClusterManager) GetStatus() string {
func (m *OBClusterManager) InitStatus() {
m.Logger.Info("newly created cluster, init status")
m.Recorder.Event(m.OBCluster, "Init", "", "newly created cluster, init status")
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterConnection)
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
initialStatus := clusterstatus.New
if migrateAnnoExist {
initialStatus = clusterstatus.MigrateFromExisting
Expand Down Expand Up @@ -287,6 +287,8 @@ func (m *OBClusterManager) HandleFailure() {

func (m *OBClusterManager) GetTaskFunc(name tasktypes.TaskName) (tasktypes.TaskFunc, error) {
switch name {
case tCheckMigration:
return m.CheckMigration, nil
case tCheckImageReady:
return m.CheckImageReady, nil
case tCheckClusterMode:
Expand Down
47 changes: 45 additions & 2 deletions internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
independentVolumeAnnoVal, independentVolumeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsIndependentPVCLifecycle)
singlePVCAnnoVal, singlePVCAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSinglePVC)
modeAnnoVal, modeAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsMode)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterConnection)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBCluster, oceanbaseconst.AnnotationsSourceClusterAddress)
for _, zone := range m.OBCluster.Spec.Topology {
zoneName := m.generateZoneName(zone.Zone)
zoneExists := false
Expand Down Expand Up @@ -210,7 +210,7 @@ func (m *OBClusterManager) CreateOBZone() tasktypes.TaskError {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsMode] = modeAnnoVal
}
if migrateAnnoExist {
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterConnection] = migrateAnnoVal
obzone.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("create obzone", "zone", zoneName)
err := m.Client.Create(m.Ctx, obzone)
Expand Down Expand Up @@ -1015,3 +1015,46 @@ func (m *OBClusterManager) CheckClusterMode() tasktypes.TaskError {
}
return nil
}

// check obcluster name and id
// check obzone exists in topology
func (m *OBClusterManager) CheckMigration() tasktypes.TaskError {
m.Logger.Info("check before migration")
manager, err := m.getOceanbaseOperationManager()
if err != nil {
return errors.Wrap(err, "get operation manager")
}
obzoneList, err := manager.ListZones()
if err != nil {
return errors.Wrap(err, "list obzones")
}
for _, obzone := range obzoneList {
found := false
for _, zone := range m.OBCluster.Spec.Topology {
if obzone.Name == zone.Zone {
found = true
break
}
}
if !found {
return errors.Errorf("obzone %s not in obcluster's topology", obzone.Name)
}
}
obclusterNameParamList, err := manager.GetParameter(oceanbaseconst.ClusterNameParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster name failed")
}
obclusterName := obclusterNameParamList[0].Value
obclusterIdParamList, err := manager.GetParameter(oceanbaseconst.ClusterIdParam, nil)
if err != nil {
return errors.Wrap(err, "get obcluster id failed")
}
obclusterId := obclusterIdParamList[0].Value
if obclusterName != m.OBCluster.Spec.ClusterName {
return errors.Errorf("Cluster name mismatch, source cluster: %s, current: %s", obclusterName, m.OBCluster.Spec.ClusterName)
}
if obclusterId != fmt.Sprintf("%d", m.OBCluster.Spec.ClusterId) {
return errors.Errorf("Cluster id mismatch, source cluster: %s, current: %d", obclusterId, m.OBCluster.Spec.ClusterId)
}
return nil
}
2 changes: 1 addition & 1 deletion internal/resource/obzone/obzone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (m *OBZoneManager) GetStatus() string {

func (m *OBZoneManager) InitStatus() {
m.Logger.Info("newly created zone, init status")
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSourceClusterConnection)
_, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSourceClusterAddress)
initialStatus := zonestatus.New
if migrateAnnoExist {
initialStatus = zonestatus.MigrateFromExisting
Expand Down
4 changes: 2 additions & 2 deletions internal/resource/obzone/obzone_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *OBZoneManager) CreateOBServer() tasktypes.TaskError {
independentVolumeAnnoVal, independentVolumeAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsIndependentPVCLifecycle)
singlePVCAnnoVal, singlePVCAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSinglePVC)
modeAnnoVal, modeAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsMode)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSourceClusterConnection)
migrateAnnoVal, migrateAnnoExist := resourceutils.GetAnnotationField(m.OBZone, oceanbaseconst.AnnotationsSourceClusterAddress)
for i := currentReplica; i < m.OBZone.Spec.Topology.Replica; i++ {
serverName := m.generateServerName()
finalizerName := "finalizers.oceanbase.com.deleteobserver"
Expand Down Expand Up @@ -151,7 +151,7 @@ func (m *OBZoneManager) CreateOBServer() tasktypes.TaskError {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsMode] = modeAnnoVal
}
if migrateAnnoExist {
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterConnection] = migrateAnnoVal
observer.ObjectMeta.Annotations[oceanbaseconst.AnnotationsSourceClusterAddress] = migrateAnnoVal
}
m.Logger.Info("create observer", "server", serverName)
err := m.Client.Create(m.Ctx, observer)
Expand Down
19 changes: 12 additions & 7 deletions internal/resource/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@ func ReadPassword(c client.Client, namespace, secretName string) (string, error)

func GetSysOperationClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster) (*operation.OceanbaseOperationManager, error) {
logger.V(oceanbaseconst.LogLevelTrace).Info("Get cluster sys client", "obCluster", obcluster)
_, migrateAnnoExist := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterConnection)
manager, err := getSysClient(c, logger, obcluster, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Operator)
if err != nil {
if migrateAnnoExist && obcluster.Status.Status == clusterstatus.MigrateFromExisting {
manager, err = getSysClientFromSourceCluster(c, logger, obcluster, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Root)
}
var manager *operation.OceanbaseOperationManager
var err error
_, migrateAnnoExist := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
if migrateAnnoExist && obcluster.Status.Status == clusterstatus.MigrateFromExisting {
manager, err = getSysClientFromSourceCluster(c, logger, obcluster, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Root)
} else {
manager, err = getSysClient(c, logger, obcluster, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Operator)
}
return manager, err
}
Expand Down Expand Up @@ -110,12 +111,16 @@ func GetTenantRootOperationClient(c client.Client, logger *logr.Logger, obcluste
}

func getSysClientFromSourceCluster(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
sysClient, err := getSysClient(c, logger, obcluster, userName, tenantName, secretName)
if err == nil {
return sysClient, err

Check failure on line 116 in internal/resource/utils/util.go

View workflow job for this annotation

GitHub Actions / lint

error is nil (line 114) but it returns error (nilerr)
}
password, err := ReadPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
// when obcluster is under migrating, use address from annotation
migrateAnnoVal, _ := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterConnection)
migrateAnnoVal, _ := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
servers := strings.Split(migrateAnnoVal, ";")
for _, server := range servers {
addressParts := strings.Split(server, ":")
Expand Down
3 changes: 2 additions & 1 deletion pkg/oceanbase-sdk/const/sql/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ See the Mulan PSL v2 for more details.
package sql

const (
GetZone = "select name, value, lower(info) as info from __all_zone where zone = ? and name in ('idc', 'recovery_status', 'region', 'status', 'storage_type', 'zone_type')"
GetZone = "select zone as name, lower(status) as status, idc, region, type from DBA_OB_ZONES where zone = ?"
ListZones = "select zone as name, lower(status) as status, idc, region, type from DBA_OB_ZONES"
AddZone = "alter system add zone ?"
DeleteZone = "alter system delete zone ?"
StartZone = "alter system start zone ?"
Expand Down
47 changes: 5 additions & 42 deletions pkg/oceanbase-sdk/model/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,10 @@ See the Mulan PSL v2 for more details.

package model

// database entity
type OBZoneInfo struct {
Name string `json:"name" db:"name"`
Value int64 `json:"value" db:"value"`
Info string `json:"info" db:"info"`
}

// response data
type OBZone struct {
Name string `json:"name"`
Idc string `json:"idc"`
RecoveryStatus string `json:"recovery_status"`
Region string `json:"region"`
Status string `json:"status"`
StorageType string `json:"storage_type"`
ZoneType string `json:"zone_type"`
}

func NewOBZone(zoneName string, obzoneInfoList []OBZoneInfo) *OBZone {
obzone := &OBZone{
Name: zoneName,
}
for _, obzoneInfo := range obzoneInfoList {
if obzoneInfo.Name == "idc" {
obzone.Idc = obzoneInfo.Info
}
if obzoneInfo.Name == "recovery_status" {
obzone.RecoveryStatus = obzoneInfo.Info
}
if obzoneInfo.Name == "region" {
obzone.Region = obzoneInfo.Info
}
if obzoneInfo.Name == "status" {
obzone.Status = obzoneInfo.Info
}
if obzoneInfo.Name == "storage_type" {
obzone.StorageType = obzoneInfo.Info
}
if obzoneInfo.Name == "zone_type" {
obzone.ZoneType = obzoneInfo.Info
}
}
return obzone
Name string `json:"name" db:"name"`
Idc string `json:"idc" db:"idc"`
Region string `json:"region" db:"region"`
Status string `json:"status" db:"status"`
Type string `json:"type" db:"type"`
}
24 changes: 15 additions & 9 deletions pkg/oceanbase-sdk/operation/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,24 @@ func (m *OceanbaseOperationManager) DeleteZone(zoneName string) error {
return nil
}

func (m *OceanbaseOperationManager) GetZone(zoneName string) (*model.OBZone, error) {
obzoneInfoList := make([]model.OBZoneInfo, 0)
err := m.QueryList(&obzoneInfoList, sql.GetZone, zoneName)
func (m *OceanbaseOperationManager) ListZones() ([]model.OBZone, error) {
zoneList := make([]model.OBZone, 0)
err := m.QueryList(&zoneList, sql.ListZones)
if err != nil {
m.Logger.Error(err, "Got exception when query zone info")
return nil, errors.Wrap(err, "Query zone info")
m.Logger.Error(err, "Got exception when list all zone")
return nil, errors.Wrap(err, "list all zone")
}
if len(obzoneInfoList) == 0 {
return nil, errors.Errorf("Query obzone %s info get empty result", zoneName)
return zoneList, nil
}

func (m *OceanbaseOperationManager) GetZone(zoneName string) (*model.OBZone, error) {
zone := &model.OBZone{}
err := m.QueryRow(zone, sql.GetZone, zoneName)
if err != nil {
m.Logger.Error(err, "Got exception when query zone")
return nil, errors.Wrap(err, "query zone info")
}
obzone := model.NewOBZone(zoneName, obzoneInfoList)
return obzone, nil
return zone, nil
}

func (m *OceanbaseOperationManager) StartZone(zoneName string) error {
Expand Down

0 comments on commit 3c61ca3

Please sign in to comment.