Skip to content

Commit

Permalink
ddl/ingest: remove backend context manager and refactor code (#57770)
Browse files Browse the repository at this point in the history
ref #57670
  • Loading branch information
tangenta authored Jan 17, 2025
1 parent 365a722 commit db7843a
Show file tree
Hide file tree
Showing 39 changed files with 890 additions and 747 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ go_library(
"@com_github_tikv_pd_client//clients/router",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//opt",
"@com_github_tikv_pd_client//servicediscovery",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down Expand Up @@ -282,6 +281,7 @@ go_test(
"//pkg/config",
"//pkg/ddl/copr",
"//pkg/ddl/ingest",
"//pkg/ddl/ingest/testutil",
"//pkg/ddl/logutil",
"//pkg/ddl/mock",
"//pkg/ddl/placement",
Expand Down
34 changes: 5 additions & 29 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,34 +735,13 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
hasUnique = hasUnique || indexInfo.Unique
}

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
bcCtx, err := ingest.NewBackendCtxBuilder(ctx, dc.store, job).
WithCheckpointManagerParam(sessPool, reorgInfo.PhysicalTableID).
Build()
if err != nil {
return errors.Trace(err)
}
defer ingest.LitBackCtxMgr.Unregister(job.ID)

cpMgr, err := ingest.NewCheckpointManager(
ctx,
sessPool,
reorgInfo.PhysicalTableID,
job.ID,
indexIDs,
ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID),
dc.store.(kv.StorageWithPD).GetPDClient(),
)
if err != nil {
logutil.DDLIngestLogger().Warn("create checkpoint manager failed",
zap.Int64("jobID", job.ID),
zap.Error(err))
} else {
defer cpMgr.Close()
bcCtx.AttachCheckpointManager(cpMgr)
}
defer bcCtx.Close()

reorgCtx := dc.getReorgCtx(job.ID)
rowCntListener := &localRowCntListener{
Expand All @@ -786,6 +765,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
Expand All @@ -800,7 +780,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
job.ReorgMeta,
avgRowSize,
importConc,
cpMgr,
rowCntListener,
)
if err != nil {
Expand All @@ -817,9 +796,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
}
return err
}
if cpMgr != nil {
cpMgr.AdvanceWatermark(true, true)
}
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

Expand Down
46 changes: 5 additions & 41 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ import (
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -59,7 +56,9 @@ type BackfillSubTaskMeta struct {
RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"`
DataFiles []string `json:"data-files,omitempty"`
StatFiles []string `json:"stat-files,omitempty"`
TS uint64 `json:"ts,omitempty"`
// TS is used to make sure subtasks are idempotent.
// TODO(tangenta): support local sort.
TS uint64 `json:"ts,omitempty"`
// Each group of MetaGroups represents a different index kvs meta.
MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"`
// EleIDs stands for the index/column IDs to backfill with distributed framework.
Expand Down Expand Up @@ -124,55 +123,20 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
jc := ddlObj.jobContext(jobMeta.ID, jobMeta.ReorgMeta)
ddlObj.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, s.getBackendCtx, cloudStorageURI, estRowSize)
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, cloudStorageURI, estRowSize)
case proto.BackfillStepMergeSort:
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI)
case proto.BackfillStepWriteAndIngest:
if len(cloudStorageURI) == 0 {
return nil, errors.Errorf("local import does not have write & ingest step")
}
return newCloudImportExecutor(jobMeta, indexInfos, tbl, s.getBackendCtx, cloudStorageURI)
return newCloudImportExecutor(jobMeta, ddlObj.store, indexInfos, tbl, cloudStorageURI)
default:
// should not happen, caller has checked the stage
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
}

func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
job := &s.taskMeta.Job
hasUnique, err := hasUniqueIndex(job)
if err != nil {
return nil, err
}
ddlObj := s.d
discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()

return ingest.LitBackCtxMgr.Register(
s.BaseTaskExecutor.Ctx(),
job.ID, hasUnique,
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetMaxWriteSpeedOrDefault(),
job.RealStartTS,
)
}

func hasUniqueIndex(job *model.Job) (bool, error) {
args, err := model.GetModifyIndexArgs(job)
if err != nil {
return false, errors.Trace(err)
}

for _, a := range args.IndexArgs {
if a.Unique {
return true, nil
}
}
return false, nil
}

type backfillDistExecutor struct {
*taskexecutor.BaseTaskExecutor
d *ddl
Expand Down
42 changes: 33 additions & 9 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
return nil, err
}
logger.Info("on next subtasks batch")

storeWithPD := sch.d.store.(kv.StorageWithPD)
// TODO: use planner.
switch nextStep {
case proto.BackfillStepReadIndex:
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
return generatePartitionPlan(ctx, storeWithPD, tblInfo)
}
return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
case proto.BackfillStepMergeSort:
Expand All @@ -116,7 +116,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
})
return generateGlobalSortIngestPlan(
ctx,
sch.d.store.(kv.StorageWithPD),
storeWithPD,
taskHandle,
task,
backfillMeta.CloudStorageURI,
Expand Down Expand Up @@ -212,7 +212,11 @@ func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.Tab
return tblInfo, nil
}

func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) {
func generatePartitionPlan(
ctx context.Context,
store kv.StorageWithPD,
tblInfo *model.TableInfo,
) (metas [][]byte, err error) {
defs := tblInfo.Partition.Definitions
physicalIDs := make([]int64, len(defs))
for i := range defs {
Expand All @@ -221,8 +225,14 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error)

subTaskMetas := make([][]byte, 0, len(physicalIDs))
for _, physicalID := range physicalIDs {
// It should be different for each subtask to determine if there are duplicate entries.
importTS, err := allocNewTS(ctx, store)
if err != nil {
return nil, err
}
subTaskMeta := &BackfillSubTaskMeta{
PhysicalTableID: physicalID,
TS: importTS,
}

metaBytes, err := json.Marshal(subTaskMeta)
Expand Down Expand Up @@ -297,6 +307,11 @@ func generateNonPartitionPlan(
regionBatch := CalculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)

for i := 0; i < len(recordRegionMetas); i += regionBatch {
// It should be different for each subtask to determine if there are duplicate entries.
importTS, err := allocNewTS(ctx, d.store.(kv.StorageWithPD))
if err != nil {
return true, nil
}
end := i + regionBatch
if end > len(recordRegionMetas) {
end = len(recordRegionMetas)
Expand All @@ -305,6 +320,7 @@ func generateNonPartitionPlan(
subTaskMeta := &BackfillSubTaskMeta{
RowStart: batch[0].StartKey(),
RowEnd: batch[len(batch)-1].EndKey(),
TS: importTS,
}
if i == 0 {
subTaskMeta.RowStart = startKey
Expand Down Expand Up @@ -409,6 +425,16 @@ func generateGlobalSortIngestPlan(
return metaArr, nil
}

func allocNewTS(ctx context.Context, store kv.StorageWithPD) (uint64, error) {
pdCli := store.GetPDClient()
p, l, err := pdCli.GetTS(ctx)
if err != nil {
return 0, err
}
ts := oracle.ComposeTS(p, l)
return ts, nil
}

func splitSubtaskMetaForOneKVMetaGroup(
ctx context.Context,
store kv.StorageWithPD,
Expand All @@ -422,15 +448,13 @@ func splitSubtaskMetaForOneKVMetaGroup(
// Skip global sort for empty table.
return nil, nil
}
pdCli := store.GetPDClient()
p, l, err := pdCli.GetTS(ctx)
importTS, err := allocNewTS(ctx, store)
if err != nil {
return nil, err
}
ts := oracle.ComposeTS(p, l)
failpoint.Inject("mockTSForGlobalSort", func(val failpoint.Value) {
i := val.(int)
ts = uint64(i)
importTS = uint64(i)
})
splitter, err := getRangeSplitter(
ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger)
Expand Down Expand Up @@ -482,7 +506,7 @@ func splitSubtaskMetaForOneKVMetaGroup(
StatFiles: statFiles,
RangeJobKeys: rangeJobKeys,
RangeSplitKeys: regionSplitKeys,
TS: ts,
TS: importTS,
}
if eleID > 0 {
m.EleIDs = []int64{eleID}
Expand Down
26 changes: 14 additions & 12 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,36 @@ import (
type cloudImportExecutor struct {
taskexecutor.BaseStepExecutor
job *model.Job
store kv.Storage
indexes []*model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
cloudStoreURI string
backendCtx ingest.BackendCtx
}

func newCloudImportExecutor(
job *model.Job,
store kv.Storage,
indexes []*model.IndexInfo,
ptbl table.PhysicalTable,
bcGetter func() (ingest.BackendCtx, error),
cloudStoreURI string,
) (*cloudImportExecutor, error) {
bc, err := bcGetter()
if err != nil {
return nil, err
}
return &cloudImportExecutor{
job: job,
store: store,
indexes: indexes,
ptbl: ptbl,
bc: bc,
cloudStoreURI: cloudStoreURI,
}, nil
}

func (*cloudImportExecutor) Init(ctx context.Context) error {
func (m *cloudImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor init subtask exec env")
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build()
if err != nil {
return err
}
m.backendCtx = bCtx
return nil
}

Expand All @@ -73,8 +75,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if err != nil {
return err
}

local := m.bc.GetLocalBackend()
local := m.backendCtx.GetLocalBackend()
if local == nil {
return errors.Errorf("local backend not found")
}
Expand Down Expand Up @@ -155,7 +156,8 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub

func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor clean up subtask env")
// cleanup backend context
ingest.LitBackCtxMgr.Unregister(m.job.ID)
if m.backendCtx != nil {
m.backendCtx.Close()
}
return nil
}
Loading

0 comments on commit db7843a

Please sign in to comment.