diff --git a/pkg/controller/observer_controller.go b/pkg/controller/observer_controller.go index 1c61a874f..5e8480648 100644 --- a/pkg/controller/observer_controller.go +++ b/pkg/controller/observer_controller.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" @@ -114,7 +113,6 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.OBServer{}). - WithOptions(controller.Options{MaxConcurrentReconciles: 9}). WithEventFilter(preds). Complete(r) } diff --git a/pkg/resource/globalWhiteList.go b/pkg/resource/globalWhiteList.go new file mode 100644 index 000000000..c0bb19ce0 --- /dev/null +++ b/pkg/resource/globalWhiteList.go @@ -0,0 +1,17 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package resource + +import "sync" + +var GlobalWhiteListMap sync.Map diff --git a/pkg/resource/obtenant_manager.go b/pkg/resource/obtenant_manager.go index 76788cd8f..c128fb818 100644 --- a/pkg/resource/obtenant_manager.go +++ b/pkg/resource/obtenant_manager.go @@ -51,9 +51,6 @@ type OBTenantManager struct { Logger *logr.Logger } -// TODO add lock to be thread safe, and read/write whitelist from/to DB -var GlobalWhiteListMap = make(map[string]string, 0) - func (m *OBTenantManager) getClusterSysClient() (*operation.OceanbaseOperationManager, error) { obcluster, err := m.getOBCluster() if err != nil { @@ -567,14 +564,14 @@ func (m *OBTenantManager) buildTenantStatus() (*v1alpha1.OBTenantStatus, error) tenantCurrentStatus.TenantRecordInfo = v1alpha1.TenantRecordInfo{} tenantCurrentStatus.TenantRecordInfo.TenantID = int(obtenant.TenantID) - // TODO get whitelist from tenant account - whitelist, exists := GlobalWhiteListMap[obtenant.TenantName] + // TODO: get whitelist from tenant account + whitelist, exists := GlobalWhiteListMap.Load(obtenant.TenantName) if exists { - tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist + tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist.(string) } else { // try update whitelist after the manager restart - GlobalWhiteListMap[obtenant.TenantName] = tenant.DefaultOBTcpInvitedNodes - tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = GlobalWhiteListMap[obtenant.TenantName] + GlobalWhiteListMap.Store(obtenant.TenantName, tenant.DefaultOBTcpInvitedNodes) + tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = tenant.DefaultOBTcpInvitedNodes } tenantCurrentStatus.TenantRecordInfo.UnitNumber = poolStatusList[0].UnitNumber diff --git a/pkg/resource/obtenant_task.go b/pkg/resource/obtenant_task.go index a280a63ae..3008a142e 100644 --- a/pkg/resource/obtenant_task.go +++ b/pkg/resource/obtenant_task.go @@ -224,9 +224,9 @@ func (m *OBTenantManager) CheckAndApplyWhiteList() error { if err != nil { return err } - // TODO get whitelist variable by tenant account + // TODO: get whitelist variable by tenant account // Because getting a whitelist requires specifying a tenant , temporary use .Status.TenantRecordInfo.ConnectWhiteList as value in db - GlobalWhiteListMap[tenantName] = specWhiteList + GlobalWhiteListMap.Store(tenantName, specWhiteList) } return nil } @@ -389,7 +389,7 @@ func (m *OBTenantManager) createTenant() error { m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "failed to create OBTenant", err.Error()) return err } - GlobalWhiteListMap[tenantName] = m.OBTenant.Spec.ConnectWhiteList + GlobalWhiteListMap.Store(tenantName, m.OBTenant.Spec.ConnectWhiteList) // Create user or change password of root, do not return error m.Recorder.Event(m.OBTenant, "Create", "", "create OBTenant successfully") return nil diff --git a/pkg/resource/obtenantrestore_task.go b/pkg/resource/obtenantrestore_task.go index 968e8122e..ffe5228e2 100644 --- a/pkg/resource/obtenantrestore_task.go +++ b/pkg/resource/obtenantrestore_task.go @@ -117,7 +117,7 @@ func (m *OBTenantManager) WatchRestoreJobToFinish() error { } time.Sleep(5 * time.Second) } - GlobalWhiteListMap[m.OBTenant.Spec.TenantName] = m.OBTenant.Spec.ConnectWhiteList + GlobalWhiteListMap.Store(m.OBTenant.Spec.TenantName, m.OBTenant.Spec.ConnectWhiteList) m.Recorder.Event(m.OBTenant, "RestoreJobFinished", "", "restore job finished successfully") return nil } diff --git a/pkg/task/task_manager.go b/pkg/task/task_manager.go index 747f38a62..0165b214a 100644 --- a/pkg/task/task_manager.go +++ b/pkg/task/task_manager.go @@ -32,9 +32,7 @@ func GetTaskManager() *TaskManager { taskManagerOnce.Do(func() { logger := log.FromContext(context.TODO()) taskManager = &TaskManager{ - ResultMap: make(map[string]chan *TaskResult), - Logger: &logger, - TaskResultCache: make(map[string]*TaskResult, 0), + Logger: &logger, } }) return taskManager @@ -46,17 +44,17 @@ type TaskResult struct { } type TaskManager struct { - ResultMap map[string]chan *TaskResult + ResultMap sync.Map Logger *logr.Logger - TaskResultCache map[string]*TaskResult + TaskResultCache sync.Map } func (m *TaskManager) Submit(f func() error) string { retCh := make(chan *TaskResult, 1) taskId := uuid.New().String() // TODO add lock to keep ResultMap safe - m.ResultMap[taskId] = retCh - m.TaskResultCache[taskId] = nil + m.ResultMap.Store(taskId, retCh) + m.TaskResultCache.Delete(taskId) go func() { defer func() { if r := recover(); r != nil { @@ -83,30 +81,38 @@ func (m *TaskManager) Submit(f func() error) string { } func (m *TaskManager) GetTaskResult(taskId string) (*TaskResult, error) { - retCh, exists := m.ResultMap[taskId] + retChAny, exists := m.ResultMap.Load(taskId) if !exists { return nil, errors.Errorf("Task %s not exists", taskId) } - if m.TaskResultCache[taskId] == nil { + retCh, ok := retChAny.(chan *TaskResult) + if !ok { + return nil, errors.Errorf("Task %s not exists", taskId) + } + result, exists := m.TaskResultCache.Load(taskId) + if !exists { select { case result := <-retCh: - m.TaskResultCache[taskId] = result + m.TaskResultCache.Store(taskId, result) return result, nil default: return nil, nil } - } else { - return m.TaskResultCache[taskId], nil } + return result.(*TaskResult), nil } func (m *TaskManager) CleanTaskResult(taskId string) error { - retCh, exists := m.ResultMap[taskId] + retChAny, exists := m.ResultMap.Load(taskId) if !exists { return nil } + retCh, ok := retChAny.(chan *TaskResult) + if !ok { + return nil + } close(retCh) - delete(m.ResultMap, taskId) - delete(m.TaskResultCache, taskId) + m.ResultMap.Delete(taskId) + m.TaskResultCache.Delete(taskId) return nil }