Skip to content

Commit

Permalink
Bugfix: concurrent map writes (#95)
Browse files Browse the repository at this point in the history
* fix(concurrency): fix task manager concurrent problem, and some other fixes
  • Loading branch information
powerfooI authored Nov 13, 2023
1 parent 270ba63 commit f0304c9
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 35 deletions.
19 changes: 18 additions & 1 deletion api/v1alpha1/obtenant_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,6 +33,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
"github.com/oceanbase/ob-operator/pkg/const/status/tenantstatus"
)

// log is for logging in this package.
Expand Down Expand Up @@ -69,6 +72,8 @@ func (r *OBTenant) Default() {

if r.Spec.TenantRole == "" {
r.Spec.TenantRole = constants.TenantRolePrimary
} else {
r.Spec.TenantRole = apitypes.TenantRole(strings.ToUpper(string(r.Spec.TenantRole)))
}
}

Expand All @@ -86,7 +91,14 @@ func (r *OBTenant) ValidateCreate() (admission.Warnings, error) {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *OBTenant) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
_ = old
// TODO(user): fill in your validation logic upon object update.
if r.Status.Status == tenantstatus.Running {
switch {
case r.Spec.ClusterName != old.(*OBTenant).Spec.ClusterName:
return nil, apierrors.NewBadRequest("Cannot change clusterName when tenant is running")
case r.Spec.TenantName != old.(*OBTenant).Spec.TenantName:
return nil, apierrors.NewBadRequest("Cannot change tenantName when tenant is running")
}
}
return nil, r.validateMutation()
}

Expand All @@ -97,6 +109,11 @@ func (r *OBTenant) validateMutation() error {
}
var allErrs field.ErrorList

// 0. TenantRole must be one of PRIMARY and STANDBY
if r.Spec.TenantRole != constants.TenantRolePrimary && r.Spec.TenantRole != constants.TenantRoleStandby {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("tenantRole"), r.Spec.TenantRole, "TenantRole must be primary or standby"))
}

// 0. OBCluster must exist
cluster := &OBCluster{}
err := tenantClt.Get(context.Background(), types.NamespacedName{
Expand Down
19 changes: 19 additions & 0 deletions api/v1alpha1/obtenantbackuppolicy_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
oceanbaseconst "github.com/oceanbase/ob-operator/pkg/const/oceanbase"
"github.com/oceanbase/ob-operator/pkg/const/status/tenantstatus"
)
Expand All @@ -60,11 +61,16 @@ var _ webhook.Defaulter = &OBTenantBackupPolicy{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *OBTenantBackupPolicy) Default() {
// Set default values for backup policy destination types
if r.Spec.DataBackup.Destination.Type == "" {
r.Spec.DataBackup.Destination.Type = constants.BackupDestTypeNFS
} else {
r.Spec.DataBackup.Destination.Type = apitypes.BackupDestType(strings.ToUpper(string(r.Spec.DataBackup.Destination.Type)))
}
if r.Spec.LogArchive.Destination.Type == "" {
r.Spec.LogArchive.Destination.Type = constants.BackupDestTypeNFS
} else {
r.Spec.LogArchive.Destination.Type = apitypes.BackupDestType(strings.ToUpper(string(r.Spec.LogArchive.Destination.Type)))
}
if r.Spec.LogArchive.SwitchPieceInterval == "" {
r.Spec.LogArchive.SwitchPieceInterval = "1d"
Expand Down Expand Up @@ -220,6 +226,19 @@ func (r *OBTenantBackupPolicy) validateBackupPolicy() error {
}
}

if r.Spec.LogArchive.Binding != constants.ArchiveBindingOptional && r.Spec.LogArchive.Binding != constants.ArchiveBindingMandatory {
return field.Invalid(field.NewPath("spec").Child("logArchive").Child("binding"), r.Spec.LogArchive.Binding, "invalid binding, only optional and mandatory are supported")
}

// Check types of destinations are legal
if r.Spec.LogArchive.Destination.Type != constants.BackupDestTypeNFS && r.Spec.LogArchive.Destination.Type != constants.BackupDestTypeOSS {
return field.Invalid(field.NewPath("spec").Child("logArchive").Child("destination").Child("type"), r.Spec.LogArchive.Destination.Type, "invalid destination type, only NFS and OSS are supported")
}
if r.Spec.DataBackup.Destination.Type != constants.BackupDestTypeNFS && r.Spec.DataBackup.Destination.Type != constants.BackupDestTypeOSS {
return field.Invalid(field.NewPath("spec").Child("dataBackup").Child("destination").Child("type"), r.Spec.DataBackup.Destination.Type, "invalid destination type, only NFS and OSS are supported")
}

// Check oss access of destinations
if r.Spec.DataBackup.Destination.Type == constants.BackupDestTypeOSS && r.Spec.DataBackup.Destination.OSSAccessSecret != "" {
if !ossPathPattern.MatchString(r.Spec.DataBackup.Destination.Path) {
return field.Invalid(field.NewPath("spec").Child("dataBackup").Child("destination").Child("path"), r.Spec.DataBackup.Destination.Path, "invalid path, pattern: ^oss://[^/]+/[^/].*\\?host=.+$")
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/obtenantoperation_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"context"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/oceanbase/ob-operator/api/constants"
apitypes "github.com/oceanbase/ob-operator/api/types"
)

// log is for logging in this package.
Expand All @@ -51,7 +53,8 @@ var _ webhook.Defaulter = &OBTenantOperation{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *OBTenantOperation) Default() {
obtenantoperationlog.Info("default", "name", r.Name)
r.Spec.Type = apitypes.TenantOperationType(strings.ToUpper(string(r.Spec.Type)))

tenant := &OBTenant{}
var targetTenantName string
var secondaryTenantName string
Expand Down Expand Up @@ -113,7 +116,8 @@ func (r *OBTenantOperation) ValidateCreate() (admission.Warnings, error) {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *OBTenantOperation) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
_ = old
return nil, r.validateMutation()
warnings := []string{"Updating operation resource can not trigger any action, please create a new one if you want to do that"}
return warnings, r.validateMutation()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand All @@ -140,6 +144,8 @@ func (r *OBTenantOperation) validateMutation() error {
if r.Spec.Switchover == nil || r.Spec.Switchover.PrimaryTenant == "" || r.Spec.Switchover.StandbyTenant == "" {
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("switchover").Child("primaryTenant", "standbyTenant"), "name of primary tenant and standby tenant are both required"))
}
default:
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("type"), string(r.Spec.Type)+" type of operation is not supported"))
}
if len(allErrs) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion charts/oceanbase-cluster/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Welcome to OceanBase Cluster!

After installing OBCluster chart, you need to wait for the cluster bootstrapped. Bootstrap progress will cost approximately 2~3 minutes which may varies depends on the machine.
After installing OBCluster chart, you need to wait for the cluster bootstrapped. Bootstrap progress will cost approximately 2~3 minutes which may vary depends on the machine.

You can use the following command to wait for the OBCluster to be ready.

Expand Down
4 changes: 3 additions & 1 deletion charts/oceanbase-cluster/templates/secret.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{{- if $.Values.generateUserSecrets }}
{{- range $secretName := $.Values.userSecrets }}
{{- if empty (lookup "v1" "Secret" $.Release.Namespace $secretName) }}
--- # the lookup function will return an empty list when dry-running or local rendering
apiVersion: v1
kind: Secret
Expand All @@ -10,7 +9,10 @@ metadata:
labels:
{{- include "oceanbase-cluster.labels" $ | nindent 4 }}
data:
{{- if empty (lookup "v1" "Secret" $.Release.Namespace $secretName) }}
password: {{ randAlphaNum 16 | b64enc }}
{{- else }}
password: {{ (lookup "v1" "Secret" $.Release.Namespace $secretName).data.password }}
{{- end }}
{{- end }}
{{- end }}
2 changes: 0 additions & 2 deletions pkg/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
Expand Down Expand Up @@ -114,7 +113,6 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.OBServer{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 9}).
WithEventFilter(preds).
Complete(r)
}
13 changes: 5 additions & 8 deletions pkg/resource/obtenant_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type OBTenantManager struct {
Logger *logr.Logger
}

// TODO add lock to be thread safe, and read/write whitelist from/to DB
var GlobalWhiteListMap = make(map[string]string, 0)

func (m *OBTenantManager) getClusterSysClient() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -567,14 +564,14 @@ func (m *OBTenantManager) buildTenantStatus() (*v1alpha1.OBTenantStatus, error)
tenantCurrentStatus.TenantRecordInfo = v1alpha1.TenantRecordInfo{}
tenantCurrentStatus.TenantRecordInfo.TenantID = int(obtenant.TenantID)

// TODO get whitelist from tenant account
whitelist, exists := GlobalWhiteListMap[obtenant.TenantName]
// TODO: get whitelist from tenant account
whitelist, exists := tenantWhiteListMap.Load(obtenant.TenantName)
if exists {
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist.(string)
} else {
// try update whitelist after the manager restart
GlobalWhiteListMap[obtenant.TenantName] = tenant.DefaultOBTcpInvitedNodes
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = GlobalWhiteListMap[obtenant.TenantName]
tenantWhiteListMap.Store(obtenant.TenantName, tenant.DefaultOBTcpInvitedNodes)
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = tenant.DefaultOBTcpInvitedNodes
}

tenantCurrentStatus.TenantRecordInfo.UnitNumber = poolStatusList[0].UnitNumber
Expand Down
6 changes: 3 additions & 3 deletions pkg/resource/obtenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (m *OBTenantManager) CheckAndApplyWhiteList() error {
if err != nil {
return err
}
// TODO get whitelist variable by tenant account
// TODO: get whitelist variable by tenant account
// Because getting a whitelist requires specifying a tenant , temporary use .Status.TenantRecordInfo.ConnectWhiteList as value in db
GlobalWhiteListMap[tenantName] = specWhiteList
tenantWhiteListMap.Store(tenantName, specWhiteList)
}
return nil
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (m *OBTenantManager) createTenant() error {
m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "failed to create OBTenant", err.Error())
return err
}
GlobalWhiteListMap[tenantName] = m.OBTenant.Spec.ConnectWhiteList
tenantWhiteListMap.Store(tenantName, m.OBTenant.Spec.ConnectWhiteList)
// Create user or change password of root, do not return error
m.Recorder.Event(m.OBTenant, "Create", "", "create OBTenant successfully")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/obtenantrestore_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *OBTenantManager) WatchRestoreJobToFinish() error {
}
time.Sleep(5 * time.Second)
}
GlobalWhiteListMap[m.OBTenant.Spec.TenantName] = m.OBTenant.Spec.ConnectWhiteList
tenantWhiteListMap.Store(m.OBTenant.Spec.TenantName, m.OBTenant.Spec.ConnectWhiteList)
m.Recorder.Event(m.OBTenant, "RestoreJobFinished", "", "restore job finished successfully")
return nil
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/resource/tenantWhiteList.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package resource

import "sync"

var tenantWhiteListMap sync.Map
37 changes: 21 additions & 16 deletions pkg/task/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func GetTaskManager() *TaskManager {
taskManagerOnce.Do(func() {
logger := log.FromContext(context.TODO())
taskManager = &TaskManager{
ResultMap: make(map[string]chan *TaskResult),
Logger: &logger,
TaskResultCache: make(map[string]*TaskResult, 0),
Logger: &logger,
}
})
return taskManager
Expand All @@ -46,17 +44,16 @@ type TaskResult struct {
}

type TaskManager struct {
ResultMap map[string]chan *TaskResult
ResultMap sync.Map
Logger *logr.Logger
TaskResultCache map[string]*TaskResult
TaskResultCache sync.Map
}

func (m *TaskManager) Submit(f func() error) string {
retCh := make(chan *TaskResult, 1)
taskId := uuid.New().String()
// TODO add lock to keep ResultMap safe
m.ResultMap[taskId] = retCh
m.TaskResultCache[taskId] = nil
m.ResultMap.Store(taskId, retCh)
m.TaskResultCache.Delete(taskId)
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -83,30 +80,38 @@ func (m *TaskManager) Submit(f func() error) string {
}

func (m *TaskManager) GetTaskResult(taskId string) (*TaskResult, error) {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil, errors.Errorf("Task %s not exists", taskId)
}
if m.TaskResultCache[taskId] == nil {
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil, errors.Errorf("Task %s not exists", taskId)
}
result, exists := m.TaskResultCache.Load(taskId)
if !exists {
select {
case result := <-retCh:
m.TaskResultCache[taskId] = result
m.TaskResultCache.Store(taskId, result)
return result, nil
default:
return nil, nil
}
} else {
return m.TaskResultCache[taskId], nil
}
return result.(*TaskResult), nil
}

func (m *TaskManager) CleanTaskResult(taskId string) error {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil
}
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil
}
close(retCh)
delete(m.ResultMap, taskId)
delete(m.TaskResultCache, taskId)
m.ResultMap.Delete(taskId)
m.TaskResultCache.Delete(taskId)
return nil
}
Loading

0 comments on commit f0304c9

Please sign in to comment.