Skip to content

Commit

Permalink
fix(concurrency): change map in task manager struct to sync.map
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI committed Nov 13, 2023
1 parent 270ba63 commit b11fce8
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 29 deletions.
2 changes: 0 additions & 2 deletions pkg/controller/observer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1"
Expand Down Expand Up @@ -114,7 +113,6 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
func (r *OBServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.OBServer{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 9}).
WithEventFilter(preds).
Complete(r)
}
17 changes: 17 additions & 0 deletions pkg/resource/globalWhiteList.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package resource

import "sync"

var GlobalWhiteListMap sync.Map
13 changes: 5 additions & 8 deletions pkg/resource/obtenant_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type OBTenantManager struct {
Logger *logr.Logger
}

// TODO add lock to be thread safe, and read/write whitelist from/to DB
var GlobalWhiteListMap = make(map[string]string, 0)

func (m *OBTenantManager) getClusterSysClient() (*operation.OceanbaseOperationManager, error) {
obcluster, err := m.getOBCluster()
if err != nil {
Expand Down Expand Up @@ -567,14 +564,14 @@ func (m *OBTenantManager) buildTenantStatus() (*v1alpha1.OBTenantStatus, error)
tenantCurrentStatus.TenantRecordInfo = v1alpha1.TenantRecordInfo{}
tenantCurrentStatus.TenantRecordInfo.TenantID = int(obtenant.TenantID)

// TODO get whitelist from tenant account
whitelist, exists := GlobalWhiteListMap[obtenant.TenantName]
// TODO: get whitelist from tenant account
whitelist, exists := GlobalWhiteListMap.Load(obtenant.TenantName)
if exists {
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = whitelist.(string)
} else {
// try update whitelist after the manager restart
GlobalWhiteListMap[obtenant.TenantName] = tenant.DefaultOBTcpInvitedNodes
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = GlobalWhiteListMap[obtenant.TenantName]
GlobalWhiteListMap.Store(obtenant.TenantName, tenant.DefaultOBTcpInvitedNodes)
tenantCurrentStatus.TenantRecordInfo.ConnectWhiteList = tenant.DefaultOBTcpInvitedNodes
}

tenantCurrentStatus.TenantRecordInfo.UnitNumber = poolStatusList[0].UnitNumber
Expand Down
6 changes: 3 additions & 3 deletions pkg/resource/obtenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (m *OBTenantManager) CheckAndApplyWhiteList() error {
if err != nil {
return err
}
// TODO get whitelist variable by tenant account
// TODO: get whitelist variable by tenant account
// Because getting a whitelist requires specifying a tenant , temporary use .Status.TenantRecordInfo.ConnectWhiteList as value in db
GlobalWhiteListMap[tenantName] = specWhiteList
GlobalWhiteListMap.Store(tenantName, specWhiteList)
}
return nil
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func (m *OBTenantManager) createTenant() error {
m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "failed to create OBTenant", err.Error())
return err
}
GlobalWhiteListMap[tenantName] = m.OBTenant.Spec.ConnectWhiteList
GlobalWhiteListMap.Store(tenantName, m.OBTenant.Spec.ConnectWhiteList)
// Create user or change password of root, do not return error
m.Recorder.Event(m.OBTenant, "Create", "", "create OBTenant successfully")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/obtenantrestore_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *OBTenantManager) WatchRestoreJobToFinish() error {
}
time.Sleep(5 * time.Second)
}
GlobalWhiteListMap[m.OBTenant.Spec.TenantName] = m.OBTenant.Spec.ConnectWhiteList
GlobalWhiteListMap.Store(m.OBTenant.Spec.TenantName, m.OBTenant.Spec.ConnectWhiteList)
m.Recorder.Event(m.OBTenant, "RestoreJobFinished", "", "restore job finished successfully")
return nil
}
Expand Down
36 changes: 21 additions & 15 deletions pkg/task/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func GetTaskManager() *TaskManager {
taskManagerOnce.Do(func() {
logger := log.FromContext(context.TODO())
taskManager = &TaskManager{
ResultMap: make(map[string]chan *TaskResult),
Logger: &logger,
TaskResultCache: make(map[string]*TaskResult, 0),
Logger: &logger,
}
})
return taskManager
Expand All @@ -46,17 +44,17 @@ type TaskResult struct {
}

type TaskManager struct {
ResultMap map[string]chan *TaskResult
ResultMap sync.Map
Logger *logr.Logger
TaskResultCache map[string]*TaskResult
TaskResultCache sync.Map
}

func (m *TaskManager) Submit(f func() error) string {
retCh := make(chan *TaskResult, 1)
taskId := uuid.New().String()
// TODO add lock to keep ResultMap safe
m.ResultMap[taskId] = retCh
m.TaskResultCache[taskId] = nil
m.ResultMap.Store(taskId, retCh)
m.TaskResultCache.Delete(taskId)
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -83,30 +81,38 @@ func (m *TaskManager) Submit(f func() error) string {
}

func (m *TaskManager) GetTaskResult(taskId string) (*TaskResult, error) {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil, errors.Errorf("Task %s not exists", taskId)
}
if m.TaskResultCache[taskId] == nil {
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil, errors.Errorf("Task %s not exists", taskId)
}
result, exists := m.TaskResultCache.Load(taskId)
if !exists {
select {
case result := <-retCh:
m.TaskResultCache[taskId] = result
m.TaskResultCache.Store(taskId, result)
return result, nil
default:
return nil, nil
}
} else {
return m.TaskResultCache[taskId], nil
}
return result.(*TaskResult), nil
}

func (m *TaskManager) CleanTaskResult(taskId string) error {
retCh, exists := m.ResultMap[taskId]
retChAny, exists := m.ResultMap.Load(taskId)
if !exists {
return nil
}
retCh, ok := retChAny.(chan *TaskResult)
if !ok {
return nil
}
close(retCh)
delete(m.ResultMap, taskId)
delete(m.TaskResultCache, taskId)
m.ResultMap.Delete(taskId)
m.TaskResultCache.Delete(taskId)
return nil
}

0 comments on commit b11fce8

Please sign in to comment.