diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 4f1e9c4aacc21..2376492c43ecd 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -200,7 +200,6 @@ go_library( "@com_github_tikv_pd_client//clients/router", "@com_github_tikv_pd_client//http", "@com_github_tikv_pd_client//opt", - "@com_github_tikv_pd_client//servicediscovery", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", @@ -282,6 +281,7 @@ go_test( "//pkg/config", "//pkg/ddl/copr", "//pkg/ddl/ingest", + "//pkg/ddl/ingest/testutil", "//pkg/ddl/logutil", "//pkg/ddl/mock", "//pkg/ddl/placement", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index c6d14004ca15b..a43fcdfb3b614 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -735,34 +735,13 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( hasUnique = hasUnique || indexInfo.Unique } - //nolint: forcetypeassert - discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) - maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() - bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS) + bcCtx, err := ingest.NewBackendCtxBuilder(ctx, dc.store, job). + WithCheckpointManagerParam(sessPool, reorgInfo.PhysicalTableID). + Build() if err != nil { return errors.Trace(err) } - defer ingest.LitBackCtxMgr.Unregister(job.ID) - - cpMgr, err := ingest.NewCheckpointManager( - ctx, - sessPool, - reorgInfo.PhysicalTableID, - job.ID, - indexIDs, - ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID), - dc.store.(kv.StorageWithPD).GetPDClient(), - ) - if err != nil { - logutil.DDLIngestLogger().Warn("create checkpoint manager failed", - zap.Int64("jobID", job.ID), - zap.Error(err)) - } else { - defer cpMgr.Close() - bcCtx.AttachCheckpointManager(cpMgr) - } + defer bcCtx.Close() reorgCtx := dc.getReorgCtx(job.ID) rowCntListener := &localRowCntListener{ @@ -786,6 +765,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( zap.Int64s("index IDs", indexIDs)) return errors.Trace(err) } + importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) pipe, err := NewAddIndexIngestPipeline( opCtx, dc.store, @@ -800,7 +780,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( job.ReorgMeta, avgRowSize, importConc, - cpMgr, rowCntListener, ) if err != nil { @@ -817,9 +796,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( } return err } - if cpMgr != nil { - cpMgr.AdvanceWatermark(true, true) - } return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) } diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 065db33570bd8..7911908fd0668 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -19,7 +19,6 @@ import ( "encoding/json" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" @@ -27,9 +26,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" - "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -59,7 +56,9 @@ type BackfillSubTaskMeta struct { RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"` DataFiles []string `json:"data-files,omitempty"` StatFiles []string `json:"stat-files,omitempty"` - TS uint64 `json:"ts,omitempty"` + // TS is used to make sure subtasks are idempotent. + // TODO(tangenta): support local sort. + TS uint64 `json:"ts,omitempty"` // Each group of MetaGroups represents a different index kvs meta. MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"` // EleIDs stands for the index/column IDs to backfill with distributed framework. @@ -124,55 +123,20 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor( jc := ddlObj.jobContext(jobMeta.ID, jobMeta.ReorgMeta) ddlObj.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) - return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, s.getBackendCtx, cloudStorageURI, estRowSize) + return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, cloudStorageURI, estRowSize) case proto.BackfillStepMergeSort: return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI) case proto.BackfillStepWriteAndIngest: if len(cloudStorageURI) == 0 { return nil, errors.Errorf("local import does not have write & ingest step") } - return newCloudImportExecutor(jobMeta, indexInfos, tbl, s.getBackendCtx, cloudStorageURI) + return newCloudImportExecutor(jobMeta, ddlObj.store, indexInfos, tbl, cloudStorageURI) default: // should not happen, caller has checked the stage return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID) } } -func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { - job := &s.taskMeta.Job - hasUnique, err := hasUniqueIndex(job) - if err != nil { - return nil, err - } - ddlObj := s.d - discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - - return ingest.LitBackCtxMgr.Register( - s.BaseTaskExecutor.Ctx(), - job.ID, hasUnique, - ddlObj.etcdCli, - discovery, - job.ReorgMeta.ResourceGroupName, - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), - job.ReorgMeta.GetMaxWriteSpeedOrDefault(), - job.RealStartTS, - ) -} - -func hasUniqueIndex(job *model.Job) (bool, error) { - args, err := model.GetModifyIndexArgs(job) - if err != nil { - return false, errors.Trace(err) - } - - for _, a := range args.IndexArgs { - if a.Unique { - return true, nil - } - } - return false, nil -} - type backfillDistExecutor struct { *taskexecutor.BaseTaskExecutor d *ddl diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index d7c2ba365e8dc..ea3f4ce8d5276 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -93,12 +93,12 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch( return nil, err } logger.Info("on next subtasks batch") - + storeWithPD := sch.d.store.(kv.StorageWithPD) // TODO: use planner. switch nextStep { case proto.BackfillStepReadIndex: if tblInfo.Partition != nil { - return generatePartitionPlan(tblInfo) + return generatePartitionPlan(ctx, storeWithPD, tblInfo) } return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs)) case proto.BackfillStepMergeSort: @@ -116,7 +116,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch( }) return generateGlobalSortIngestPlan( ctx, - sch.d.store.(kv.StorageWithPD), + storeWithPD, taskHandle, task, backfillMeta.CloudStorageURI, @@ -212,7 +212,11 @@ func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.Tab return tblInfo, nil } -func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) { +func generatePartitionPlan( + ctx context.Context, + store kv.StorageWithPD, + tblInfo *model.TableInfo, +) (metas [][]byte, err error) { defs := tblInfo.Partition.Definitions physicalIDs := make([]int64, len(defs)) for i := range defs { @@ -221,8 +225,14 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) subTaskMetas := make([][]byte, 0, len(physicalIDs)) for _, physicalID := range physicalIDs { + // It should be different for each subtask to determine if there are duplicate entries. + importTS, err := allocNewTS(ctx, store) + if err != nil { + return nil, err + } subTaskMeta := &BackfillSubTaskMeta{ PhysicalTableID: physicalID, + TS: importTS, } metaBytes, err := json.Marshal(subTaskMeta) @@ -297,6 +307,11 @@ func generateNonPartitionPlan( regionBatch := CalculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud) for i := 0; i < len(recordRegionMetas); i += regionBatch { + // It should be different for each subtask to determine if there are duplicate entries. + importTS, err := allocNewTS(ctx, d.store.(kv.StorageWithPD)) + if err != nil { + return true, nil + } end := i + regionBatch if end > len(recordRegionMetas) { end = len(recordRegionMetas) @@ -305,6 +320,7 @@ func generateNonPartitionPlan( subTaskMeta := &BackfillSubTaskMeta{ RowStart: batch[0].StartKey(), RowEnd: batch[len(batch)-1].EndKey(), + TS: importTS, } if i == 0 { subTaskMeta.RowStart = startKey @@ -409,6 +425,16 @@ func generateGlobalSortIngestPlan( return metaArr, nil } +func allocNewTS(ctx context.Context, store kv.StorageWithPD) (uint64, error) { + pdCli := store.GetPDClient() + p, l, err := pdCli.GetTS(ctx) + if err != nil { + return 0, err + } + ts := oracle.ComposeTS(p, l) + return ts, nil +} + func splitSubtaskMetaForOneKVMetaGroup( ctx context.Context, store kv.StorageWithPD, @@ -422,15 +448,13 @@ func splitSubtaskMetaForOneKVMetaGroup( // Skip global sort for empty table. return nil, nil } - pdCli := store.GetPDClient() - p, l, err := pdCli.GetTS(ctx) + importTS, err := allocNewTS(ctx, store) if err != nil { return nil, err } - ts := oracle.ComposeTS(p, l) failpoint.Inject("mockTSForGlobalSort", func(val failpoint.Value) { i := val.(int) - ts = uint64(i) + importTS = uint64(i) }) splitter, err := getRangeSplitter( ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger) @@ -482,7 +506,7 @@ func splitSubtaskMetaForOneKVMetaGroup( StatFiles: statFiles, RangeJobKeys: rangeJobKeys, RangeSplitKeys: regionSplitKeys, - TS: ts, + TS: importTS, } if eleID > 0 { m.EleIDs = []int64{eleID} diff --git a/pkg/ddl/backfilling_import_cloud.go b/pkg/ddl/backfilling_import_cloud.go index a4bdcd09c1000..7fc7baa43e8f7 100644 --- a/pkg/ddl/backfilling_import_cloud.go +++ b/pkg/ddl/backfilling_import_cloud.go @@ -35,34 +35,36 @@ import ( type cloudImportExecutor struct { taskexecutor.BaseStepExecutor job *model.Job + store kv.Storage indexes []*model.IndexInfo ptbl table.PhysicalTable - bc ingest.BackendCtx cloudStoreURI string + backendCtx ingest.BackendCtx } func newCloudImportExecutor( job *model.Job, + store kv.Storage, indexes []*model.IndexInfo, ptbl table.PhysicalTable, - bcGetter func() (ingest.BackendCtx, error), cloudStoreURI string, ) (*cloudImportExecutor, error) { - bc, err := bcGetter() - if err != nil { - return nil, err - } return &cloudImportExecutor{ job: job, + store: store, indexes: indexes, ptbl: ptbl, - bc: bc, cloudStoreURI: cloudStoreURI, }, nil } -func (*cloudImportExecutor) Init(ctx context.Context) error { +func (m *cloudImportExecutor) Init(ctx context.Context) error { logutil.Logger(ctx).Info("cloud import executor init subtask exec env") + bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build() + if err != nil { + return err + } + m.backendCtx = bCtx return nil } @@ -73,8 +75,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub if err != nil { return err } - - local := m.bc.GetLocalBackend() + local := m.backendCtx.GetLocalBackend() if local == nil { return errors.Errorf("local backend not found") } @@ -155,7 +156,8 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub func (m *cloudImportExecutor) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("cloud import executor clean up subtask env") - // cleanup backend context - ingest.LitBackCtxMgr.Unregister(m.job.ID) + if m.backendCtx != nil { + m.backendCtx.Close() + } return nil } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 0d5f324c917ec..fd71382a4f159 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -159,7 +159,6 @@ func NewAddIndexIngestPipeline( reorgMeta *model.DDLReorgMeta, avgRowSize int, concurrency int, - cpMgr *ingest.CheckpointManager, rowCntListener RowCountListener, ) (*operator.AsyncPipeline, error) { indexes := make([]table.Index, 0, len(idxInfos)) @@ -180,12 +179,12 @@ func NewAddIndexIngestPipeline( rm = nil } - srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, - reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), rm) + srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, backendCtx) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), rm, backendCtx) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, - tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) - sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) + tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, rowCntListener) + sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, rowCntListener) operator.Compose[TableScanTask](srcOp, scanOp) operator.Compose[IndexRecordChunk](scanOp, ingestOp) @@ -247,14 +246,14 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, - reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), nil) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), nil, nil) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta, ) - sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener) + sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, rowCntListener) operator.Compose[TableScanTask](srcOp, scanOp) operator.Compose[IndexRecordChunk](scanOp, writeOp) @@ -332,8 +331,7 @@ type TableScanTaskSource struct { startKey kv.Key endKey kv.Key - // only used in local ingest - cpMgr *ingest.CheckpointManager + cpOp ingest.CheckpointOperator } // NewTableScanTaskSource creates a new TableScanTaskSource. @@ -343,7 +341,7 @@ func NewTableScanTaskSource( physicalTable table.PhysicalTable, startKey kv.Key, endKey kv.Key, - cpMgr *ingest.CheckpointManager, + cpOp ingest.CheckpointOperator, ) *TableScanTaskSource { return &TableScanTaskSource{ ctx: ctx, @@ -352,7 +350,7 @@ func NewTableScanTaskSource( store: store, startKey: startKey, endKey: endKey, - cpMgr: cpMgr, + cpOp: cpOp, } } @@ -370,10 +368,10 @@ func (src *TableScanTaskSource) Open() error { // adjustStartKey adjusts the start key so that we can skip the ranges that have been processed // according to the information of checkpoint manager. func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.Key, done bool) { - if src.cpMgr == nil { + if src.cpOp == nil { return start, false } - cpKey := src.cpMgr.NextKeyToProcess() + cpKey := src.cpOp.NextStartKey() if len(cpKey) == 0 { return start, false } @@ -488,9 +486,9 @@ func NewTableScanOperator( copCtx copr.CopContext, srcChkPool *sync.Pool, concurrency int, - cpMgr *ingest.CheckpointManager, hintBatchSize int, reorgMeta *model.DDLReorgMeta, + cpOp ingest.CheckpointOperator, ) *TableScanOperator { totalCount := new(atomic.Int64) pool := workerpool.NewWorkerPool( @@ -504,7 +502,7 @@ func NewTableScanOperator( sessPool: sessPool, se: nil, srcChkPool: srcChkPool, - cpMgr: cpMgr, + cpOp: cpOp, hintBatchSize: hintBatchSize, totalCount: totalCount, reorgMeta: reorgMeta, @@ -519,7 +517,9 @@ func NewTableScanOperator( // Close implements operator.Operator interface. func (o *TableScanOperator) Close() error { - o.logger.Info("table scan operator total count", zap.Int64("count", o.totalCount.Load())) + defer func() { + o.logger.Info("table scan operator total count", zap.Int64("count", o.totalCount.Load())) + }() return o.AsyncOperator.Close() } @@ -530,7 +530,7 @@ type tableScanWorker struct { se *session.Session srcChkPool *sync.Pool - cpMgr *ingest.CheckpointManager + cpOp ingest.CheckpointOperator reorgMeta *model.DDLReorgMeta hintBatchSize int totalCount *atomic.Int64 @@ -572,8 +572,8 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor if err != nil { return err } - if w.cpMgr != nil { - w.cpMgr.Register(task.ID, task.End) + if w.cpOp != nil { + w.cpOp.AddChunk(task.ID, task.End) } var done bool for !done { @@ -585,8 +585,8 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor return err } idxResult = IndexRecordChunk{ID: task.ID, Chunk: srcChk, Done: done, ctx: w.ctx} - if w.cpMgr != nil { - w.cpMgr.UpdateTotalKeys(task.ID, srcChk.NumRows(), done) + if w.cpOp != nil { + w.cpOp.UpdateChunk(task.ID, srcChk.NumRows(), done) } w.totalCount.Add(int64(srcChk.NumRows())) sender(idxResult) @@ -702,7 +702,6 @@ type IndexWriteResult struct { ID int Added int Total int - Next kv.Key } // IndexIngestOperator writes index records to ingest engine. @@ -722,7 +721,6 @@ func NewIndexIngestOperator( srcChunkPool *sync.Pool, concurrency int, reorgMeta *model.DDLReorgMeta, - cpMgr *ingest.CheckpointManager, rowCntListener RowCountListener, ) *IndexIngestOperator { writerCfg := getLocalWriterConfig(len(indexes), concurrency) @@ -760,7 +758,6 @@ func NewIndexIngestOperator( }, backendCtx: backendCtx, rowCntListener: rowCntListener, - cpMgr: cpMgr, } }) return &IndexIngestOperator{ @@ -790,7 +787,6 @@ type indexIngestLocalWorker struct { indexIngestBaseWorker backendCtx ingest.BackendCtx rowCntListener RowCountListener - cpMgr *ingest.CheckpointManager } func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { @@ -808,18 +804,12 @@ func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(Index return } w.rowCntListener.Written(rs.Added) - flushed, imported, err := w.backendCtx.Flush(w.ctx, ingest.FlushModeAuto) + err = w.backendCtx.IngestIfQuotaExceeded(w.ctx, ck.ID, rs.Added) if err != nil { w.ctx.onError(err) return } - if w.cpMgr != nil { - totalCnt, nextKey := w.cpMgr.Status() - rs.Total = totalCnt - rs.Next = nextKey - w.cpMgr.UpdateWrittenKeys(ck.ID, rs.Added) - w.cpMgr.AdvanceWatermark(flushed, imported) - } + rs.Total = w.backendCtx.TotalKeyCount() send(rs) } @@ -850,7 +840,7 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul ID: rs.ID, } w.initSessCtx() - count, nextKey, err := w.WriteChunk(&rs) + count, _, err := w.WriteChunk(&rs) if err != nil { w.ctx.onError(err) return result, err @@ -863,7 +853,6 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul w.totalCount.Add(int64(count)) } result.Added = count - result.Next = nextKey if ResultCounterForTest != nil { ResultCounterForTest.Add(1) } @@ -930,7 +919,6 @@ type indexWriteResultSink struct { tbl table.PhysicalTable indexes []table.Index - cpMgr *ingest.CheckpointManager rowCntListener RowCountListener errGroup errgroup.Group @@ -942,7 +930,6 @@ func newIndexWriteResultSink( backendCtx ingest.BackendCtx, tbl table.PhysicalTable, indexes []table.Index, - cpMgr *ingest.CheckpointManager, rowCntListener RowCountListener, ) *indexWriteResultSink { return &indexWriteResultSink{ @@ -951,7 +938,6 @@ func newIndexWriteResultSink( tbl: tbl, indexes: indexes, errGroup: errgroup.Group{}, - cpMgr: cpMgr, rowCntListener: rowCntListener, } } @@ -976,9 +962,11 @@ func (s *indexWriteResultSink) collectResult() error { if err != nil { s.ctx.onError(err) } - if s.cpMgr != nil { - total, _ := s.cpMgr.Status() - s.rowCntListener.SetTotal(total) + if s.backendCtx != nil { + total := s.backendCtx.TotalKeyCount() + if total > 0 { + s.rowCntListener.SetTotal(total) + } } return err } @@ -993,20 +981,7 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - flushed, imported, err := s.backendCtx.Flush(s.ctx, ingest.FlushModeForceFlushAndImport) - if s.cpMgr != nil { - // Try to advance watermark even if there is an error. - s.cpMgr.AdvanceWatermark(flushed, imported) - } - if err != nil { - msg := "flush error" - if flushed { - msg = "import error" - } - logutil.Logger(s.ctx).Error(msg, zap.String("category", "ddl"), zap.Error(err)) - return err - } - return nil + return s.backendCtx.Ingest(s.ctx) } func (s *indexWriteResultSink) Close() error { diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 9efb032204c12..96b35fe528f5f 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -51,7 +51,6 @@ type readIndexExecutor struct { avgRowSize int cloudStorageURI string - bc ingest.BackendCtx curRowCount *atomic.Int64 subtaskSummary sync.Map // subtaskID => readIndexSummary @@ -68,21 +67,15 @@ func newReadIndexExecutor( indexes []*model.IndexInfo, ptbl table.PhysicalTable, jc *ReorgContext, - bcGetter func() (ingest.BackendCtx, error), cloudStorageURI string, avgRowSize int, ) (*readIndexExecutor, error) { - bc, err := bcGetter() - if err != nil { - return nil, err - } return &readIndexExecutor{ d: d, job: job, indexes: indexes, ptbl: ptbl, jc: jc, - bc: bc, cloudStorageURI: cloudStorageURI, avgRowSize: avgRowSize, curRowCount: &atomic.Int64{}, @@ -123,7 +116,15 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return r.onFinished(ctx, subtask) } - pipe, err := r.buildLocalStorePipeline(opCtx, sm, concurrency) + // TODO(tangenta): support checkpoint manager that interact with subtask table. + bCtx, err := ingest.NewBackendCtxBuilder(ctx, r.d.store, r.job). + WithImportDistributedLock(r.d.etcdCli, sm.TS). + Build() + if err != nil { + return err + } + defer bCtx.Close() + pipe, err := r.buildLocalStorePipeline(opCtx, bCtx, sm, concurrency) if err != nil { return err } @@ -131,13 +132,13 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if err != nil { // For dist task local based ingest, checkpoint is unsupported. // If there is an error we should keep local sort dir clean. - err1 := r.bc.FinishAndUnregisterEngines(ingest.OptCleanData) + err1 := bCtx.FinishAndUnregisterEngines(ingest.OptCleanData) if err1 != nil { logutil.DDLLogger().Warn("read index executor unregister engine failed", zap.Error(err1)) } return err } - err = r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) + err = bCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) if err != nil { return errors.Trace(err) } @@ -150,10 +151,8 @@ func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary { } } -func (r *readIndexExecutor) Cleanup(ctx context.Context) error { +func (*readIndexExecutor) Cleanup(ctx context.Context) error { tidblogutil.Logger(ctx).Info("read index executor cleanup subtask exec env") - // cleanup backend context - ingest.LitBackCtxMgr.Unregister(r.job.ID) return nil } @@ -217,6 +216,7 @@ func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) ( func (r *readIndexExecutor) buildLocalStorePipeline( opCtx *OperatorCtx, + backendCtx ingest.BackendCtx, sm *BackfillSubTaskMeta, concurrency int, ) (*operator.AsyncPipeline, error) { @@ -236,7 +236,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline( } idxNames.WriteString(index.Name.O) } - engines, err := r.bc.Register(indexIDs, uniques, r.ptbl) + engines, err := backendCtx.Register(indexIDs, uniques, r.ptbl) if err != nil { tidblogutil.Logger(opCtx).Error("cannot register new engine", zap.Error(err), @@ -249,7 +249,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline( opCtx, d.store, d.sessPool, - r.bc, + backendCtx, engines, r.job.ID, tbl, @@ -259,7 +259,6 @@ func (r *readIndexExecutor) buildLocalStorePipeline( r.job.ReorgMeta, r.avgRowSize, concurrency, - nil, rowCntListener, ) } diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index d67fac8443331..aedf19e7dd814 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -17,6 +17,7 @@ package ddl import ( "bytes" "context" + "math" "testing" "time" @@ -62,17 +63,8 @@ func TestDoneTaskKeeper(t *testing.T) { } func TestPickBackfillType(t *testing.T) { - originMgr := ingest.LitBackCtxMgr - originInit := ingest.LitInitialized - defer func() { - ingest.LitBackCtxMgr = originMgr - ingest.LitInitialized = originInit - }() - mockMgr := ingest.NewMockBackendCtxMgr( - func() sessionctx.Context { - return nil - }) - ingest.LitBackCtxMgr = mockMgr + ingest.LitDiskRoot = ingest.NewDiskRootImpl(t.TempDir()) + ingest.LitMemRoot = ingest.NewMemRootImpl(math.MaxInt64) mockJob := &model.Job{ ID: 1, ReorgMeta: &model.DDLReorgMeta{ @@ -95,6 +87,7 @@ func TestPickBackfillType(t *testing.T) { tp, err = pickBackfillType(mockJob) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeLitMerge) + ingest.LitInitialized = false } func assertStaticExprContextEqual(t *testing.T, sctx sessionctx.Context, exprCtx *exprstatic.ExprContext, warnHandler contextutil.WarnHandler) { diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 4585bcf297d6f..c6726d8280d6d 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -51,7 +51,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, opCtx, cancel := ddl.NewLocalOperatorCtx(context.Background(), 1) defer cancel() src := testutil.NewOperatorTestSource(ddl.TableScanTask{ID: 1, Start: startKey, End: endKey}) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, 0, nil, nil) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 56e6021f86e76..b3bd6b8c0da4e 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -77,7 +77,6 @@ import ( "github.com/tikv/client-go/v2/tikv" kvutil "github.com/tikv/client-go/v2/util" pdHttp "github.com/tikv/pd/client/http" - sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -1115,9 +1114,6 @@ SwitchIndexState: // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) - if !job.ReorgMeta.IsDistReorg && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - ingest.LitBackCtxMgr.Unregister(job.ID) - } logutil.DDLLogger().Info("run add index job done", zap.String("charset", job.Charset), zap.String("collation", job.Collate)) @@ -1290,14 +1286,12 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) { job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge return model.ReorgTypeLitMerge, nil } - available, err := ingest.LitBackCtxMgr.CheckMoreTasksAvailable() - if err != nil { + if err := ingest.LitDiskRoot.PreCheckUsage(); err != nil { + logutil.DDLIngestLogger().Info("ingest backfill is not available", zap.Error(err)) return model.ReorgTypeNone, err } - if available { - job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge - return model.ReorgTypeLitMerge, nil - } + job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge + return model.ReorgTypeLitMerge, nil } // The lightning environment is unavailable, but we can still use the txn-merge backfill. logutil.DDLLogger().Info("fallback to txn-merge backfill process", @@ -1397,9 +1391,6 @@ func doReorgWorkForCreateIndex( for _, indexInfo := range allIndexInfos { indexInfo.BackfillState = model.BackfillStateMerging } - if reorgTp == model.ReorgTypeLitMerge { - ingest.LitBackCtxMgr.Unregister(job.ID) - } job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg. ver, err = updateVersionAndTableInfo(jobCtx, job, tbl.Meta(), true) return false, ver, errors.Trace(err) @@ -2404,9 +2395,7 @@ func (w *worker) addTableIndex( if err != nil { return err } - //nolint:forcetypeassert - discovery := w.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - return checkDuplicateForUniqueIndex(ctx, t, reorgInfo, discovery) + return checkDuplicateForUniqueIndex(ctx, t, reorgInfo, w.store) } } @@ -2449,15 +2438,8 @@ func (w *worker) addTableIndex( return errors.Trace(err) } -func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, discovery sd.ServiceDiscovery) error { +func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, store kv.Storage) (err error) { var bc ingest.BackendCtx - var err error - defer func() { - if bc != nil { - ingest.LitBackCtxMgr.Unregister(reorgInfo.ID) - } - }() - for _, elem := range reorgInfo.elements { indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID) if indexInfo == nil { @@ -2466,11 +2448,14 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo if indexInfo.Unique { ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { - bc, err = ingest.LitBackCtxMgr.Register( - ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS) + bc, err = ingest.NewBackendCtxBuilder(ctx, store, reorgInfo.Job). + ForDuplicateCheck(). + Build() if err != nil { return err } + //nolint:revive,all_revive + defer bc.Close() } err = bc.CollectRemoteDuplicateRows(indexInfo.ID, t) if err != nil { diff --git a/pkg/ddl/index_presplit.go b/pkg/ddl/index_presplit.go index 78d2412d1c32d..05445a96416ce 100644 --- a/pkg/ddl/index_presplit.go +++ b/pkg/ddl/index_presplit.go @@ -412,18 +412,11 @@ func waitScatterRegionFinish( if err == nil { finishScatterNum++ } else { - if len(indexName) == 0 { - logutil.DDLLogger().Warn("wait scatter region failed", - zap.Uint64("regionID", regionID), - zap.String("table", tableName), - zap.Error(err)) - } else { - logutil.DDLLogger().Warn("wait scatter region failed", - zap.Uint64("regionID", regionID), - zap.String("table", tableName), - zap.String("index", indexName), - zap.Error(err)) - } + logutil.DDLLogger().Warn("wait scatter region failed", + zap.Uint64("regionID", regionID), + zap.String("table", tableName), + zap.String("index", indexName), + zap.Error(err)) } } return finishScatterNum diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index e17710a88847b..9c7b28ce6de6e 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -39,10 +39,12 @@ go_library( "//pkg/parser/terror", "//pkg/sessionctx", "//pkg/sessionctx/variable", + "//pkg/sessiontxn", "//pkg/table", "//pkg/util", "//pkg/util/dbterror", "//pkg/util/generic", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/size", @@ -50,6 +52,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//servicediscovery", @@ -70,11 +73,11 @@ go_test( "main_test.go", "mem_root_test.go", ], - embed = [":ingest"], flaky = True, race = "on", shard_count = 23, deps = [ + ":ingest", "//pkg/config", "//pkg/ddl/ingest/testutil", "//pkg/ddl/session", diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 89c3795da3048..280c235f9c6b8 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -17,6 +17,7 @@ package ingest import ( "context" "fmt" + "math" "sync" "sync/atomic" "time" @@ -59,43 +60,52 @@ type BackendCtx interface { // FinishAndUnregisterEngines is only used in local disk based ingest. FinishAndUnregisterEngines(opt UnregisterOpt) error - FlushController + // IngestIfQuotaExceeded updates the task and count to checkpoint manager, and try to ingest them to disk or TiKV + // according to the last ingest time or the usage of local disk. + IngestIfQuotaExceeded(ctx context.Context, taskID int, count int) error - AttachCheckpointManager(*CheckpointManager) - GetCheckpointManager() *CheckpointManager + // Ingest checks if all engines need to be flushed and imported. It's concurrent safe. + Ingest(ctx context.Context) (err error) + + CheckpointOperator // GetLocalBackend exposes local.Backend. It's only used in global sort based // ingest. GetLocalBackend() *local.Backend // CollectRemoteDuplicateRows collects duplicate entry error for given index as - // the supplement of FlushController.Flush. + // the supplement of Ingest. // // CollectRemoteDuplicateRows is only used in global sort based ingest. CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error + + GetDiskUsage() uint64 + Close() } -// FlushMode is used to control how to flush. -type FlushMode byte +// CheckpointOperator contains the operations to checkpoints. +type CheckpointOperator interface { + NextStartKey() tikv.Key + TotalKeyCount() int -const ( - // FlushModeAuto means caller does not enforce any flush, the implementation can - // decide it. - FlushModeAuto FlushMode = iota - // FlushModeForceFlushAndImport means flush and import all data to TiKV. - FlushModeForceFlushAndImport -) + AddChunk(id int, endKey tikv.Key) + UpdateChunk(id int, count int, done bool) + FinishChunk(id int, count int) + + AdvanceWatermark(imported bool) error + + GetImportTS() uint64 +} // litBackendCtx implements BackendCtx. type litBackendCtx struct { - engines map[int64]*engineInfo - memRoot MemRoot - diskRoot DiskRoot - jobID int64 - tbl table.Table - backend *local.Backend - ctx context.Context - cfg *local.BackendConfig - sysVars map[string]string + engines map[int64]*engineInfo + memRoot MemRoot + jobID int64 + tbl table.Table + backend *local.Backend + ctx context.Context + cfg *local.BackendConfig + sysVars map[string]string flushing atomic.Bool timeOfLastFlush atomicutil.Time @@ -103,6 +113,7 @@ type litBackendCtx struct { checkpointMgr *CheckpointManager etcdClient *clientv3.Client initTS uint64 + importTS uint64 // unregisterMu prevents concurrent calls of `FinishAndUnregisterEngines`. // For details, see https://github.com/pingcap/tidb/issues/53843. @@ -158,41 +169,54 @@ func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Tab return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe) } -// Flush implements FlushController. -func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, imported bool, err error) { - shouldFlush, shouldImport := bc.checkFlush(mode) +func (bc *litBackendCtx) IngestIfQuotaExceeded(ctx context.Context, taskID int, count int) error { + bc.FinishChunk(taskID, count) + shouldFlush, shouldImport := bc.checkFlush() if !shouldFlush { - return false, false, nil + return nil } if !bc.flushing.CompareAndSwap(false, true) { - return false, false, nil + return nil } defer bc.flushing.Store(false) - - for _, ei := range bc.engines { - ei.flushLock.Lock() - //nolint: all_revive,revive - defer ei.flushLock.Unlock() - - if err = ei.Flush(); err != nil { - return false, false, err - } + err := bc.flushEngines(ctx) + if err != nil { + return err } bc.timeOfLastFlush.Store(time.Now()) if !shouldImport { - return true, false, nil + return bc.AdvanceWatermark(false) } - if bc.etcdClient != nil { - key := fmt.Sprintf("/tidb/distributeLock/%d", bc.jobID) - release, err := owner.AcquireDistributedLock(bc.ctx, bc.etcdClient, key, 10) - if err != nil { - return true, false, err - } - if release != nil { - defer release() - } + release, err := bc.tryAcquireDistLock() + if err != nil { + return err + } + if release != nil { + defer release() + } + + err = bc.unsafeImportAndResetAllEngines(ctx) + if err != nil { + return err + } + return bc.AdvanceWatermark(true) +} + +// Ingest implements BackendContext. +func (bc *litBackendCtx) Ingest(ctx context.Context) error { + err := bc.flushEngines(ctx) + if err != nil { + return err + } + + release, err := bc.tryAcquireDistLock() + if err != nil { + return err + } + if release != nil { + defer release() } failpoint.Inject("mockDMLExecutionStateBeforeImport", func(_ failpoint.Value) { @@ -201,8 +225,38 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im } }) + err = bc.unsafeImportAndResetAllEngines(ctx) + if err != nil { + return err + } + + return bc.AdvanceWatermark(true) +} + +func (bc *litBackendCtx) flushEngines(ctx context.Context) error { + for _, ei := range bc.engines { + ei.flushLock.Lock() + if err := ei.Flush(); err != nil { + logutil.Logger(ctx).Error("flush error", zap.Error(err)) + ei.flushLock.Unlock() + return err + } + ei.flushLock.Unlock() + } + return nil +} + +func (bc *litBackendCtx) tryAcquireDistLock() (func(), error) { + if bc.etcdClient == nil { + return nil, nil + } + key := fmt.Sprintf("/tidb/distributeLock/%d", bc.jobID) + return owner.AcquireDistributedLock(bc.ctx, bc.etcdClient, key, distributedKeyTTLInSec) +} + +func (bc *litBackendCtx) unsafeImportAndResetAllEngines(ctx context.Context) error { for indexID, ei := range bc.engines { - if err = bc.unsafeImportAndReset(ctx, ei); err != nil { + if err := bc.unsafeImportAndReset(ctx, ei); err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { idxInfo := model.FindIndexInfoByID(bc.tbl.Meta().Indices, indexID) if idxInfo == nil { @@ -214,30 +268,11 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im err = TryConvertToKeyExistsErr(err, idxInfo, bc.tbl.Meta()) } } - return true, false, err + logutil.Logger(ctx).Error("import error", zap.Error(err)) + return err } } - - var newTS uint64 - if mgr := bc.GetCheckpointManager(); mgr != nil { - // for local disk case, we need to refresh TS because duplicate detection - // requires each ingest to have a unique TS. - // - // TODO(lance6716): there's still a chance that data is imported but because of - // checkpoint is low-watermark, the data will still be imported again with - // another TS after failover. Need to refine the checkpoint mechanism. - newTS, err = mgr.refreshTSAndUpdateCP() - if err == nil { - for _, ei := range bc.engines { - err = bc.backend.SetTSAfterResetEngine(ei.uuid, newTS) - if err != nil { - return false, false, err - } - } - } - } - - return true, true, err + return nil } func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInfo) error { @@ -246,29 +281,27 @@ func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInf ) logger.Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID), - zap.String("usage info", bc.diskRoot.UsageInfo())) + zap.String("usage info", LitDiskRoot.UsageInfo())) closedEngine := backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0) + ingestTS := bc.GetImportTS() + logger.Info("set ingest ts before import", zap.Int64("jobID", bc.jobID), zap.Uint64("ts", ingestTS)) + err := bc.backend.SetTSBeforeImportEngine(ctx, ei.uuid, ingestTS) + if err != nil { + logger.Error("set TS failed", zap.Int64("index ID", ei.indexID)) + return err + } regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio) regionSplitKeys := int64(lightning.SplitRegionKeys) if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { logger.Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID), - zap.String("usage info", bc.diskRoot.UsageInfo())) + zap.String("usage info", LitDiskRoot.UsageInfo())) return err } - resetFn := bc.backend.ResetEngineSkipAllocTS - mgr := bc.GetCheckpointManager() - if mgr == nil { - // disttask case, no need to refresh TS. - // - // TODO(lance6716): for disttask local sort case, we need to use a fixed TS. But - // it doesn't have checkpoint, so we need to find a way to save TS. - resetFn = bc.backend.ResetEngine - } - - err := resetFn(ctx, ei.uuid) + // TS will be set before local backend import. We don't need to alloc a new one when reset. + err = bc.backend.ResetEngineSkipAllocTS(ctx, ei.uuid) failpoint.Inject("mockResetEngineFailed", func() { err = fmt.Errorf("mock reset engine failed") }) @@ -288,16 +321,16 @@ func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInf // ForceSyncFlagForTest is a flag to force sync only for test. var ForceSyncFlagForTest = false -func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) { +func (bc *litBackendCtx) checkFlush() (shouldFlush bool, shouldImport bool) { failpoint.Inject("forceSyncFlagForTest", func() { // used in a manual test ForceSyncFlagForTest = true }) - if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest { + if ForceSyncFlagForTest { return true, true } - bc.diskRoot.UpdateUsage() - shouldImport = bc.diskRoot.ShouldImport() + LitDiskRoot.UpdateUsage() + shouldImport = LitDiskRoot.ShouldImport() interval := bc.updateInterval // This failpoint will be manually set through HTTP status port. failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) { @@ -310,17 +343,76 @@ func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImp return shouldFlush, shouldImport } -// AttachCheckpointManager attaches a checkpoint manager to the backend context. -func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) { - bc.checkpointMgr = mgr +// GetLocalBackend returns the local backend. +func (bc *litBackendCtx) GetLocalBackend() *local.Backend { + return bc.backend } -// GetCheckpointManager returns the checkpoint manager attached to the backend context. -func (bc *litBackendCtx) GetCheckpointManager() *CheckpointManager { - return bc.checkpointMgr +// GetDiskUsage returns current disk usage of underlying backend. +func (bc *litBackendCtx) GetDiskUsage() uint64 { + _, _, bcDiskUsed, _ := local.CheckDiskQuota(bc.backend, math.MaxInt64) + return uint64(bcDiskUsed) } -// GetLocalBackend returns the local backend. -func (bc *litBackendCtx) GetLocalBackend() *local.Backend { - return bc.backend +// Close closes underlying backend and remove it from disk root. +func (bc *litBackendCtx) Close() { + logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("jobID", bc.jobID), + zap.Int64("current memory usage", LitMemRoot.CurrentUsage()), + zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota())) + bc.backend.Close() + LitDiskRoot.Remove(bc.jobID) + BackendCounterForTest.Dec() +} + +// NextStartKey implements CheckpointOperator interface. +func (bc *litBackendCtx) NextStartKey() tikv.Key { + if bc.checkpointMgr != nil { + return bc.checkpointMgr.NextKeyToProcess() + } + return nil +} + +// TotalKeyCount implements CheckpointOperator interface. +func (bc *litBackendCtx) TotalKeyCount() int { + if bc.checkpointMgr != nil { + return bc.checkpointMgr.TotalKeyCount() + } + return 0 +} + +// AddChunk implements CheckpointOperator interface. +func (bc *litBackendCtx) AddChunk(id int, endKey tikv.Key) { + if bc.checkpointMgr != nil { + bc.checkpointMgr.Register(id, endKey) + } +} + +// UpdateChunk implements CheckpointOperator interface. +func (bc *litBackendCtx) UpdateChunk(id int, count int, done bool) { + if bc.checkpointMgr != nil { + bc.checkpointMgr.UpdateTotalKeys(id, count, done) + } +} + +// FinishChunk implements CheckpointOperator interface. +func (bc *litBackendCtx) FinishChunk(id int, count int) { + if bc.checkpointMgr != nil { + bc.checkpointMgr.UpdateWrittenKeys(id, count) + } +} + +// GetImportTS implements CheckpointOperator interface. +func (bc *litBackendCtx) GetImportTS() uint64 { + if bc.checkpointMgr != nil { + return bc.checkpointMgr.GetTS() + } + return bc.importTS +} + +// AdvanceWatermark implements CheckpointOperator interface. +func (bc *litBackendCtx) AdvanceWatermark(imported bool) error { + if bc.checkpointMgr != nil { + return bc.checkpointMgr.AdvanceWatermark(imported) + } + return nil } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 609e0a1e1a22e..391a5aa157c9c 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -16,162 +16,164 @@ package ingest import ( "context" - "math" + "fmt" "net" - "os" "path/filepath" "strconv" - "sync" "time" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" + sess "github.com/pingcap/tidb/pkg/ddl/session" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/common" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/tikv/client-go/v2/tikv" sd "github.com/tikv/pd/client/servicediscovery" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" ) -// BackendCtxMgr is used to manage the BackendCtx. -type BackendCtxMgr interface { - // CheckMoreTasksAvailable checks if it can run more ingest backfill tasks. - CheckMoreTasksAvailable() (bool, error) - // Register uses jobID to identify the BackendCtx. If there's already a - // BackendCtx with the same jobID, it will be returned. Otherwise, a new - // BackendCtx will be created and returned. - Register( - ctx context.Context, - jobID int64, - hasUnique bool, - etcdClient *clientv3.Client, - pdSvcDiscovery sd.ServiceDiscovery, - resourceGroupName string, - importConc int, - maxWriteSpeed int, - initTS uint64, - ) (BackendCtx, error) - Unregister(jobID int64) - // EncodeJobSortPath encodes the job ID to the local disk sort path. - EncodeJobSortPath(jobID int64) string - // Load returns the registered BackendCtx with the given jobID. - Load(jobID int64) (BackendCtx, bool) -} +// ResignOwnerForTest is only used for test. +var ResignOwnerForTest = atomic.NewBool(false) -// litBackendCtxMgr manages multiple litBackendCtx for each DDL job. Each -// litBackendCtx can use some local disk space and memory resource which are -// controlled by litBackendCtxMgr. -type litBackendCtxMgr struct { - // the lifetime of entries in backends should cover all other resources so it can - // be used as a lightweight indicator when interacts with other resources. - // Currently, the entry must be created not after disk folder is created and - // memory usage is tracked, and vice versa when considering deletion. - backends struct { - mu sync.RWMutex - m map[int64]*litBackendCtx +// NewBackendCtxBuilder creates a BackendCtxBuilder. +func NewBackendCtxBuilder(ctx context.Context, store kv.Storage, job *model.Job) *BackendCtxBuilder { + return &BackendCtxBuilder{ + ctx: ctx, + store: store, + job: job, } - // all disk resources of litBackendCtx should be used under path. Currently the - // hierarchy is ${path}/${jobID} for each litBackendCtx. - path string - memRoot MemRoot - diskRoot DiskRoot } -// NewLitBackendCtxMgr creates a new litBackendCtxMgr. -func NewLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr { - mgr := &litBackendCtxMgr{ - path: path, - } - mgr.backends.m = make(map[int64]*litBackendCtx, 4) - mgr.memRoot = NewMemRootImpl(int64(memQuota), mgr) - mgr.diskRoot = NewDiskRootImpl(path, mgr) - LitMemRoot = mgr.memRoot - litDiskRoot = mgr.diskRoot - litDiskRoot.UpdateUsage() - err := litDiskRoot.StartupCheck() - if err != nil { - ddllogutil.DDLIngestLogger().Warn("ingest backfill may not be available", zap.Error(err)) - } - return mgr +// BackendCtxBuilder is the builder of BackendCtx. +type BackendCtxBuilder struct { + ctx context.Context + store kv.Storage + job *model.Job + + etcdClient *clientv3.Client + importTS uint64 + + sessPool *sess.Pool + physicalID int64 + checkDup bool } -// CheckMoreTasksAvailable implements BackendCtxMgr.CheckMoreTaskAvailable interface. -func (m *litBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { - if err := m.diskRoot.PreCheckUsage(); err != nil { - ddllogutil.DDLIngestLogger().Info("ingest backfill is not available", zap.Error(err)) - return false, err - } - return true, nil +// WithImportDistributedLock needs a etcd client to maintain a distributed lock during partial import. +func (b *BackendCtxBuilder) WithImportDistributedLock(etcdCli *clientv3.Client, importTS uint64) *BackendCtxBuilder { + b.etcdClient = etcdCli + b.importTS = importTS + return b } -// ResignOwnerForTest is only used for test. -var ResignOwnerForTest = atomic.NewBool(false) +// WithCheckpointManagerParam only is used by non-DXF local ingest mode. +func (b *BackendCtxBuilder) WithCheckpointManagerParam( + sessPool *sess.Pool, + physicalID int64, +) *BackendCtxBuilder { + b.sessPool = sessPool + b.physicalID = physicalID + return b +} -// Register creates a new backend and registers it to the backend context. -func (m *litBackendCtxMgr) Register( - ctx context.Context, - jobID int64, - hasUnique bool, - etcdClient *clientv3.Client, - pdSvcDiscovery sd.ServiceDiscovery, - resourceGroupName string, - concurrency int, - maxWriteSpeed int, - initTS uint64, -) (BackendCtx, error) { - bc, exist := m.Load(jobID) - if exist { - return bc, nil - } +// ForDuplicateCheck marks this backend context is only used for duplicate check. +// TODO(tangenta): remove this after we don't rely on the backend to do duplicate check. +func (b *BackendCtxBuilder) ForDuplicateCheck() *BackendCtxBuilder { + b.checkDup = true + return b +} - m.memRoot.RefreshConsumption() - ok := m.memRoot.CheckConsume(structSizeBackendCtx) - if !ok { - return nil, genBackendAllocMemFailedErr(ctx, m.memRoot, jobID) +// BackendCounterForTest is only used in test. +var BackendCounterForTest = atomic.Int64{} + +// Build builds a BackendCtx. +func (b *BackendCtxBuilder) Build() (BackendCtx, error) { + ctx, store, job := b.ctx, b.store, b.job + sortPath, err := GenIngestTempDataDir() + if err != nil { + return nil, err } - sortPath := m.EncodeJobSortPath(jobID) - err := os.MkdirAll(sortPath, 0700) + jobSortPath := filepath.Join(sortPath, encodeBackendTag(job.ID, b.checkDup)) + intest.Assert(job.Type == model.ActionAddPrimaryKey || + job.Type == model.ActionAddIndex) + intest.Assert(job.ReorgMeta != nil) + + resGroupName := job.ReorgMeta.ResourceGroupName + concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() + hasUnique, err := hasUniqueIndex(job) if err != nil { - logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err)) return nil, err } - cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed) + cfg, err := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed) if err != nil { - logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) + logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", job.ID), zap.Error(err)) return nil, err } failpoint.Inject("beforeCreateLocalBackend", func() { ResignOwnerForTest.Store(true) }) - // lock backends because createLocalBackend will let lightning create the sort - // folder, which may cause cleanupSortPath wrongly delete the sort folder if only - // checking the existence of the entry in backends. - m.backends.mu.Lock() - bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery) + + //nolint: forcetypeassert + pdCli := store.(tikv.Storage).GetRegionCache().PDClient() + var cpMgr *CheckpointManager + if b.sessPool != nil { + cpMgr, err = NewCheckpointManager(ctx, b.sessPool, b.physicalID, job.ID, jobSortPath, pdCli) + if err != nil { + logutil.Logger(ctx).Warn("create checkpoint manager failed", + zap.Int64("jobID", job.ID), + zap.Error(err)) + return nil, err + } + } + + var mockBackend BackendCtx + failpoint.InjectCall("mockNewBackendContext", b.job, cpMgr, &mockBackend) + if mockBackend != nil { + BackendCounterForTest.Inc() + return mockBackend, nil + } + + discovery := pdCli.GetServiceDiscovery() + bd, err := createLocalBackend(ctx, cfg, discovery) if err != nil { - m.backends.mu.Unlock() - logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err)) + logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", job.ID), zap.Error(err)) return nil, err } - bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient, initTS) - m.backends.m[jobID] = bcCtx - m.memRoot.Consume(structSizeBackendCtx) - m.backends.mu.Unlock() + bCtx := newBackendContext(ctx, job.ID, bd, cfg, + defaultImportantVariables, LitMemRoot, b.etcdClient, job.RealStartTS, b.importTS, cpMgr) - logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", jobID), - zap.Int64("current memory usage", m.memRoot.CurrentUsage()), - zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota()), + logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", job.ID), + zap.Int64("current memory usage", LitMemRoot.CurrentUsage()), + zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()), zap.Bool("has unique index", hasUnique)) - return bcCtx, nil + + LitDiskRoot.Add(job.ID, bCtx) + BackendCounterForTest.Add(1) + return bCtx, nil } -// EncodeJobSortPath implements BackendCtxMgr. -func (m *litBackendCtxMgr) EncodeJobSortPath(jobID int64) string { - return filepath.Join(m.path, encodeBackendTag(jobID)) +func hasUniqueIndex(job *model.Job) (bool, error) { + args, err := model.GetModifyIndexArgs(job) + if err != nil { + return false, errors.Trace(err) + } + + for _, a := range args.IndexArgs { + if a.Unique { + return true, nil + } + } + return false, nil } func createLocalBackend( @@ -207,14 +209,13 @@ func newBackendContext( cfg *local.BackendConfig, vars map[string]string, memRoot MemRoot, - diskRoot DiskRoot, etcdClient *clientv3.Client, - initTS uint64, + initTS, importTS uint64, + cpMgr *CheckpointManager, ) *litBackendCtx { bCtx := &litBackendCtx{ engines: make(map[int64]*engineInfo, 10), memRoot: memRoot, - diskRoot: diskRoot, jobID: jobID, backend: be, ctx: ctx, @@ -223,71 +224,19 @@ func newBackendContext( updateInterval: checkpointUpdateInterval, etcdClient: etcdClient, initTS: initTS, + importTS: importTS, + checkpointMgr: cpMgr, } bCtx.timeOfLastFlush.Store(time.Now()) return bCtx } -// Unregister removes a backend context from the backend context manager. -func (m *litBackendCtxMgr) Unregister(jobID int64) { - m.backends.mu.RLock() - _, exist := m.backends.m[jobID] - m.backends.mu.RUnlock() - if !exist { - return - } - - m.backends.mu.Lock() - defer m.backends.mu.Unlock() - bc, exist := m.backends.m[jobID] - if !exist { - return - } - _ = bc.FinishAndUnregisterEngines(OptCloseEngines) - bc.backend.Close() - m.memRoot.Release(structSizeBackendCtx) - m.memRoot.ReleaseWithTag(encodeBackendTag(jobID)) - logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("job ID", jobID), - zap.Int64("current memory usage", m.memRoot.CurrentUsage()), - zap.Int64("max memory quota", m.memRoot.MaxMemoryQuota())) - delete(m.backends.m, jobID) -} - -func (m *litBackendCtxMgr) Load(jobID int64) (BackendCtx, bool) { - m.backends.mu.RLock() - defer m.backends.mu.RUnlock() - ret, ok := m.backends.m[jobID] - return ret, ok -} - -// TotalDiskUsage returns the total disk usage of all backends. -func (m *litBackendCtxMgr) TotalDiskUsage() uint64 { - var totalDiskUsed uint64 - m.backends.mu.RLock() - defer m.backends.mu.RUnlock() - - for _, bc := range m.backends.m { - _, _, bcDiskUsed, _ := local.CheckDiskQuota(bc.backend, math.MaxInt64) - totalDiskUsed += uint64(bcDiskUsed) - } - return totalDiskUsed -} - -// UpdateMemoryUsage collects the memory usages from all the backend and updates it to the memRoot. -func (m *litBackendCtxMgr) UpdateMemoryUsage() { - m.backends.mu.RLock() - defer m.backends.mu.RUnlock() - - for _, bc := range m.backends.m { - curSize := bc.backend.TotalMemoryConsume() - m.memRoot.ReleaseWithTag(encodeBackendTag(bc.jobID)) - m.memRoot.ConsumeWithTag(encodeBackendTag(bc.jobID), curSize) - } -} - // encodeBackendTag encodes the job ID to backend tag. // The backend tag is also used as the file name of the local index data files. -func encodeBackendTag(jobID int64) string { +func encodeBackendTag(jobID int64, checkDup bool) string { + if checkDup { + return fmt.Sprintf("%d-dup", jobID) + } return strconv.FormatInt(jobID, 10) } diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 0d89a859ca3a7..ffba061a4263b 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -85,35 +86,24 @@ type taskCheckpoint struct { lastBatchRead bool } -// FlushController is an interface to control the flush of data so after it -// returns caller can save checkpoint. -type FlushController interface { - // Flush checks if al engines need to be flushed and imported based on given - // FlushMode. It's concurrent safe. - Flush(ctx context.Context, mode FlushMode) (flushed, imported bool, err error) -} - // NewCheckpointManager creates a new checkpoint manager. func NewCheckpointManager( ctx context.Context, sessPool *sess.Pool, physicalID int64, jobID int64, - indexIDs []int64, localStoreDir string, pdCli pd.Client, ) (*CheckpointManager, error) { instanceAddr := InstanceAddr() ctx2, cancel := context.WithCancel(ctx) - logger := logutil.DDLIngestLogger().With( - zap.Int64("jobID", jobID), zap.Int64s("indexIDs", indexIDs)) + logger := logutil.DDLIngestLogger().With(zap.Int64("jobID", jobID)) cm := &CheckpointManager{ ctx: ctx2, cancel: cancel, sessPool: sessPool, jobID: jobID, - indexIDs: indexIDs, localStoreDir: localStoreDir, pdCli: pdCli, logger: logger, @@ -171,16 +161,16 @@ func (s *CheckpointManager) NextKeyToProcess() kv.Key { return nil } -// Status returns the status of the checkpoint. -func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) { +// TotalKeyCount returns the key counts that have processed. +// It contains the keys that is not sync to checkpoint. +func (s *CheckpointManager) TotalKeyCount() int { s.mu.Lock() defer s.mu.Unlock() total := 0 for _, cp := range s.checkpoints { total += cp.writtenKeys } - // TODO(lance6716): ??? - return s.flushedKeyCnt + total, s.importedKeyLowWatermark + return s.flushedKeyCnt + total } // Register registers a new task. taskID MUST be continuous ascending and start @@ -216,9 +206,9 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) { } // AdvanceWatermark advances the watermark according to flushed or imported status. -func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool) { - if !flushed { - return +func (s *CheckpointManager) AdvanceWatermark(imported bool) error { + if s.noUpdate() { + return nil } failpoint.Inject("resignAfterFlush", func() { @@ -230,17 +220,26 @@ func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool) { } }) - s.mu.Lock() - defer s.mu.Unlock() s.afterFlush() if imported { - s.afterImport() + err := s.afterImport() + if err != nil { + return err + } + err = s.updateCheckpoint() + if err != nil { + return err + } + return nil } + return nil } // afterFlush should be called after all engine is flushed. func (s *CheckpointManager) afterFlush() { + s.mu.Lock() + defer s.mu.Unlock() for { cp := s.checkpoints[s.minTaskIDFinished] if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys { @@ -254,17 +253,41 @@ func (s *CheckpointManager) afterFlush() { } } -func (s *CheckpointManager) afterImport() { +func (s *CheckpointManager) afterImport() error { + p, l, err := s.pdCli.GetTS(s.ctx) + failpoint.Inject("mockAfterImportAllocTSFailed", func(_ failpoint.Value) { + err = errors.Errorf("mock err") + }) + if err != nil { + s.logger.Warn("advance watermark get ts failed", zap.Error(err)) + return err + } + newTS := oracle.ComposeTS(p, l) + + s.mu.Lock() + defer s.mu.Unlock() + if s.importedKeyLowWatermark.Cmp(s.flushedKeyLowWatermark) > 0 { s.logger.Warn("lower watermark of flushed key is less than imported key", zap.String("flushed", hex.EncodeToString(s.flushedKeyLowWatermark)), zap.String("imported", hex.EncodeToString(s.importedKeyLowWatermark)), ) - return + return errors.Errorf("flushed key is less than imported key") } s.importedKeyLowWatermark = s.flushedKeyLowWatermark s.importedKeyCnt = s.flushedKeyCnt + intest.Assert(s.ts < newTS) + if s.ts < newTS { + s.ts = newTS + } s.dirty = true + return nil +} + +func (s *CheckpointManager) noUpdate() bool { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.checkpoints) == 0 && s.minTaskIDFinished == 0 } // Close closes the checkpoint manager. @@ -495,15 +518,3 @@ func (s *CheckpointManager) updateCheckpoint() error { } return nil } - -func (s *CheckpointManager) refreshTSAndUpdateCP() (uint64, error) { - p, l, err := s.pdCli.GetTS(s.ctx) - if err != nil { - return 0, errors.Trace(err) - } - newTS := oracle.ComposeTS(p, l) - s.mu.Lock() - s.ts = newTS - s.mu.Unlock() - return newTS, s.updateCheckpoint() -} diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index 7a39d3e319246..a313886ecbd99 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -36,18 +36,18 @@ func createDummyFile(t *testing.T, folder string) { require.NoError(t, f.Close()) } -var ( - mockPTS int64 = 12 - mockLTS int64 = 34 - expectedTS = oracle.ComposeTS(mockPTS, mockLTS) -) - type mockGetTSClient struct { pd.Client + + pts int64 + lts int64 } -func (m mockGetTSClient) GetTS(context.Context) (int64, int64, error) { - return mockPTS, mockLTS, nil +func (m *mockGetTSClient) GetTS(context.Context) (int64, int64, error) { + p, l := m.pts, m.lts + m.pts++ + m.lts++ + return p, l, nil } func TestCheckpointManager(t *testing.T) { @@ -63,7 +63,7 @@ func TestCheckpointManager(t *testing.T) { sessPool := session.NewSessionPool(rs) tmpFolder := t.TempDir() createDummyFile(t, tmpFolder) - mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, mockGetTSClient{}) + mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, tmpFolder, &mockGetTSClient{pts: 12, lts: 34}) require.NoError(t, err) defer mgr.Close() @@ -72,26 +72,26 @@ func TestCheckpointManager(t *testing.T) { mgr.UpdateTotalKeys(0, 100, false) require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) mgr.UpdateWrittenKeys(0, 100) - mgr.AdvanceWatermark(true, false) + require.NoError(t, mgr.AdvanceWatermark(false)) require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) mgr.UpdateTotalKeys(0, 100, true) mgr.UpdateWrittenKeys(0, 100) - mgr.AdvanceWatermark(true, false) + require.NoError(t, mgr.AdvanceWatermark(false)) // The data is not imported to the storage yet. require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) mgr.UpdateWrittenKeys(1, 0) - mgr.AdvanceWatermark(true, true) // Mock the data is imported to the storage. + require.NoError(t, mgr.AdvanceWatermark(true)) // Mock the data is imported to the storage. require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) // Only when the last batch is completed, the job can be completed. mgr.UpdateTotalKeys(1, 50, false) mgr.UpdateTotalKeys(1, 50, true) mgr.UpdateWrittenKeys(1, 50) - mgr.AdvanceWatermark(true, true) + require.NoError(t, mgr.AdvanceWatermark(true)) require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) mgr.UpdateWrittenKeys(1, 50) - mgr.AdvanceWatermark(true, true) + require.NoError(t, mgr.AdvanceWatermark(true)) require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) @@ -103,9 +103,9 @@ func TestCheckpointManager(t *testing.T) { mgr.UpdateTotalKeys(3, 100, true) mgr.UpdateTotalKeys(4, 100, true) mgr.UpdateWrittenKeys(4, 100) - mgr.AdvanceWatermark(true, true) + require.NoError(t, mgr.AdvanceWatermark(true)) mgr.UpdateWrittenKeys(3, 100) - mgr.AdvanceWatermark(true, true) + require.NoError(t, mgr.AdvanceWatermark(true)) require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'})) } @@ -123,13 +123,14 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) { sessPool := session.NewSessionPool(rs) tmpFolder := t.TempDir() createDummyFile(t, tmpFolder) - mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, mockGetTSClient{}) + expectedTS := oracle.ComposeTS(13, 35) + mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, tmpFolder, &mockGetTSClient{pts: 12, lts: 34}) require.NoError(t, err) mgr.Register(0, []byte{'1', '9'}) mgr.UpdateTotalKeys(0, 100, true) mgr.UpdateWrittenKeys(0, 100) - mgr.AdvanceWatermark(true, true) + require.NoError(t, mgr.AdvanceWatermark(true)) mgr.Close() // Wait the global checkpoint to be updated to the reorg table. r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;") require.NoError(t, err) @@ -176,24 +177,24 @@ func TestCheckpointManagerResumeReorg(t *testing.T) { sessPool := session.NewSessionPool(rs) tmpFolder := t.TempDir() // checkpoint manager should not use local checkpoint if the folder is empty - mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, nil) + mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, tmpFolder, nil) require.NoError(t, err) defer mgr.Close() require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) - localCnt, globalNextKey := mgr.Status() + localCnt, globalNextKey := mgr.TotalKeyCount(), mgr.NextKeyToProcess() require.Equal(t, 0, localCnt) require.EqualValues(t, []byte{'1', '9'}, globalNextKey) require.EqualValues(t, 123456, mgr.GetTS()) createDummyFile(t, tmpFolder) - mgr2, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, nil) + mgr2, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, tmpFolder, nil) require.NoError(t, err) defer mgr2.Close() require.True(t, mgr2.IsKeyProcessed([]byte{'1', '9'})) require.True(t, mgr2.IsKeyProcessed([]byte{'2', '9'})) - localCnt, globalNextKey = mgr2.Status() + localCnt, globalNextKey = mgr2.TotalKeyCount(), mgr2.NextKeyToProcess() require.Equal(t, 100, localCnt) - require.EqualValues(t, []byte{'1', '9'}, globalNextKey) - require.EqualValues(t, 123456, mgr.GetTS()) + require.EqualValues(t, []byte{'2', '9'}, globalNextKey) + require.EqualValues(t, 123456, mgr2.GetTS()) } diff --git a/pkg/ddl/ingest/disk_root.go b/pkg/ddl/ingest/disk_root.go index 90e3fa4f62922..620bb7f66f91c 100644 --- a/pkg/ddl/ingest/disk_root.go +++ b/pkg/ddl/ingest/disk_root.go @@ -29,8 +29,17 @@ import ( "go.uber.org/zap" ) +// ResourceTracker has the method of GetUsage. +type ResourceTracker interface { + GetDiskUsage() uint64 +} + // DiskRoot is used to track the disk usage for the lightning backfill process. type DiskRoot interface { + Add(id int64, tracker ResourceTracker) + Remove(id int64) + Count() int + UpdateUsage() ShouldImport() bool UsageInfo() string @@ -46,25 +55,50 @@ type diskRootImpl struct { capacity uint64 used uint64 bcUsed uint64 - bcCtx *litBackendCtxMgr mu sync.RWMutex + items map[int64]ResourceTracker updating atomic.Bool } // NewDiskRootImpl creates a new DiskRoot. -func NewDiskRootImpl(path string, bcCtx *litBackendCtxMgr) DiskRoot { +func NewDiskRootImpl(path string) DiskRoot { return &diskRootImpl{ path: path, - bcCtx: bcCtx, + items: make(map[int64]ResourceTracker), } } +// TrackerCountForTest is only used for test. +var TrackerCountForTest = atomic.Int64{} + +// Add adds a tracker to disk root. +func (d *diskRootImpl) Add(id int64, tracker ResourceTracker) { + d.mu.Lock() + defer d.mu.Unlock() + d.items[id] = tracker + TrackerCountForTest.Add(1) +} + +// Remove removes a tracker from disk root. +func (d *diskRootImpl) Remove(id int64) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.items, id) + TrackerCountForTest.Add(-1) +} + +// Count is only used for test. +func (d *diskRootImpl) Count() int { + d.mu.Lock() + defer d.mu.Unlock() + return len(d.items) +} + // UpdateUsage implements DiskRoot interface. func (d *diskRootImpl) UpdateUsage() { if !d.updating.CompareAndSwap(false, true) { return } - bcUsed := d.bcCtx.TotalDiskUsage() var capacity, used uint64 sz, err := lcom.GetStorageSize(d.path) if err != nil { @@ -74,7 +108,11 @@ func (d *diskRootImpl) UpdateUsage() { } d.updating.Store(false) d.mu.Lock() - d.bcUsed = bcUsed + var totalUsage uint64 + for _, tracker := range d.items { + totalUsage += tracker.GetDiskUsage() + } + d.bcUsed = totalUsage d.capacity = capacity d.used = used d.mu.Unlock() diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index 7bcdebce101f8..6723b829a85c6 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -151,7 +151,6 @@ func (ei *engineInfo) CreateWriter(id int, writerCfg *backend.LocalWriterConfig) return nil, err } - ei.memRoot.Consume(structSizeWriterCtx) logutil.Logger(ei.ctx).Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id), zap.Int64("allocate memory", structSizeWriterCtx+writerCfg.Local.MemCacheSize), @@ -178,7 +177,6 @@ func (ei *engineInfo) newWriterContext(workerID int, writerCfg *backend.LocalWri } // Cache the local writer. ei.writerCache.Store(workerID, lWrite) - ei.memRoot.ConsumeWithTag(encodeBackendTag(ei.jobID), writerCfg.Local.MemCacheSize) } wc := &writerContext{ ctx: ei.ctx, @@ -200,7 +198,6 @@ func (ei *engineInfo) closeWriters() error { } } ei.writerCache.Delete(wid) - ei.memRoot.Release(structSizeWriterCtx) } return firstErr } diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index a36dad357d74e..275b1b2059b03 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -15,7 +15,10 @@ package ingest import ( + "context" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/lightning/backend" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" @@ -51,11 +54,7 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta } mgr := backend.MakeEngineManager(bc.backend) - ts := uint64(0) - if c := bc.checkpointMgr; c != nil { - ts = c.GetTS() - } - cfg := generateLocalEngineConfig(ts) + cfg := generateLocalEngineConfig(bc.GetImportTS()) openedEngines := make(map[int64]*engineInfo, numIdx) @@ -90,7 +89,6 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta ret = append(ret, ei) bc.engines[indexID] = ei } - bc.memRoot.Consume(numIdx * structSizeEngineInfo) bc.tbl = tbl logutil.Logger(bc.ctx).Info(LitInfoOpenEngine, zap.Int64("job ID", bc.jobID), @@ -120,7 +118,6 @@ func (bc *litBackendCtx) FinishAndUnregisterEngines(opt UnregisterOpt) error { if len(bc.engines) == 0 { return nil } - numIdx := int64(len(bc.engines)) for _, ei := range bc.engines { ei.Close(opt&OptCleanData != 0) } @@ -132,13 +129,14 @@ func (bc *litBackendCtx) FinishAndUnregisterEngines(opt UnregisterOpt) error { if err != nil { return errors.Trace(err) } + failpoint.Inject("mockCollectRemoteDuplicateRowsFailed", func(_ failpoint.Value) { + failpoint.Return(context.DeadlineExceeded) + }) } } } bc.engines = make(map[int64]*engineInfo, 10) - bc.memRoot.Release(numIdx * structSizeEngineInfo) - return nil } diff --git a/pkg/ddl/ingest/env.go b/pkg/ddl/ingest/env.go index ff259dc099399..f2ba096094e98 100644 --- a/pkg/ddl/ingest/env.go +++ b/pkg/ddl/ingest/env.go @@ -38,19 +38,20 @@ import ( ) var ( - // LitBackCtxMgr is the entry for the lightning backfill process. - LitBackCtxMgr BackendCtxMgr // LitMemRoot is used to track the memory usage of the lightning backfill process. LitMemRoot MemRoot - // litDiskRoot is used to track the disk usage of the lightning backfill process. - litDiskRoot DiskRoot + // LitDiskRoot is used to track the disk usage of the lightning backfill process. + LitDiskRoot DiskRoot // litRLimit is the max open file number of the lightning backfill process. litRLimit uint64 // LitInitialized is the flag indicates whether the lightning backfill process is initialized. LitInitialized bool ) -const defaultMemoryQuota = 2 * size.GB +const ( + defaultMemoryQuota = 2 * size.GB + distributedKeyTTLInSec = 10 // seconds +) // InitGlobalLightningEnv initialize Lightning backfill environment. func InitGlobalLightningEnv(path string) (ok bool) { @@ -75,12 +76,18 @@ func InitGlobalLightningEnv(path string) (ok bool) { i := val.(int) memTotal = uint64(i) * size.MB }) - LitBackCtxMgr = NewLitBackendCtxMgr(path, memTotal) + LitMemRoot = NewMemRootImpl(int64(memTotal)) + LitDiskRoot = NewDiskRootImpl(path) + LitDiskRoot.UpdateUsage() + err = LitDiskRoot.StartupCheck() + if err != nil { + logutil.DDLIngestLogger().Warn("ingest backfill may not be available", zap.Error(err)) + } litRLimit = util.GenRLimit("ddl-ingest") LitInitialized = true logutil.DDLIngestLogger().Info(LitInfoEnvInitSucc, zap.Uint64("memory limitation", memTotal), - zap.String("disk usage info", litDiskRoot.UsageInfo()), + zap.String("disk usage info", LitDiskRoot.UsageInfo()), zap.Uint64("max open file number", litRLimit), zap.Bool("lightning is initialized", LitInitialized)) return true @@ -158,7 +165,12 @@ func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string) { for id := range toCheckJobIDs { logutil.DDLIngestLogger().Info("remove stale temp index data", zap.Int64("jobID", id)) - p := filepath.Join(path, encodeBackendTag(id)) + p := filepath.Join(path, encodeBackendTag(id, false)) + err = os.RemoveAll(p) + if err != nil { + logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err)) + } + p = filepath.Join(path, encodeBackendTag(id, true)) err = os.RemoveAll(p) if err != nil { logutil.DDLIngestLogger().Error(LitErrCleanSortPath, zap.Error(err)) diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index e96eaae5ef585..362b1e8c5dbe8 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -17,6 +17,7 @@ package ingest_test import ( "fmt" "strings" + "sync" "sync/atomic" "testing" @@ -36,7 +37,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() assertLastNDDLUseIngest := func(n int) { tk.MustExec("admin check table t;") @@ -84,7 +85,7 @@ func TestIngestError(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") tk.MustExec("set global tidb_enable_dist_task = 0") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key, b int);") @@ -123,7 +124,7 @@ func TestAddIndexIngestPanic(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("set global tidb_enable_dist_task = 0") @@ -150,7 +151,7 @@ func TestAddIndexSetInternalSessions(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("set global tidb_enable_dist_task = 0;") tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") @@ -176,7 +177,7 @@ func TestAddIndexIngestCancel(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("create table t (a int, b int);") tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);") @@ -202,16 +203,15 @@ func TestAddIndexIngestCancel(t *testing.T) { tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob) require.True(t, cancelled) testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep") - ok, err := ingest.LitBackCtxMgr.CheckMoreTasksAvailable() - require.NoError(t, err) - require.True(t, ok) + cnt := ingest.LitDiskRoot.Count() + require.Equal(t, 0, cnt) } func TestIngestPartitionRowCount(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec(`create table t (a int, b int, c int as (b+10), d int as (b+c), primary key (a) clustered) partition by range (a) ( @@ -232,7 +232,7 @@ func TestAddIndexIngestClientError(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("CREATE TABLE t1 (f1 json);") tk.MustExec(`insert into t1(f1) values (cast("null" as json));`) @@ -243,7 +243,7 @@ func TestAddIndexCancelOnNoneState(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tkCancel := testkit.NewTestKit(t, store) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("use test") tk.MustExec(`create table t (c1 int, c2 int, c3 int)`) @@ -258,16 +258,15 @@ func TestAddIndexCancelOnNoneState(t *testing.T) { } }) tk.MustGetErrCode("alter table t add index idx1(c1)", errno.ErrCancelledDDLJob) - available, err := ingest.LitBackCtxMgr.CheckMoreTasksAvailable() - require.NoError(t, err) - require.True(t, available) + cnt := ingest.LitDiskRoot.Count() + require.Equal(t, 0, cnt) } func TestAddIndexIngestTimezone(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("SET time_zone = '-06:00';") tk.MustExec("create table t (`src` varchar(48),`t` timestamp,`timezone` varchar(100));") @@ -289,7 +288,7 @@ func TestAddIndexIngestMultiSchemaChange(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("create table t (a int, b int);") tk.MustExec("insert into t values(1, 1), (2, 2);") @@ -320,7 +319,7 @@ func TestAddIndexDuplicateMessage(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk.MustExec("create table t(id int primary key, b int, k int);") tk.MustExec("insert into t values (1, 1, 1);") @@ -328,28 +327,23 @@ func TestAddIndexDuplicateMessage(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk1.MustExec("use test") - var runDML bool var errDML error - - ingest.MockExecAfterWriteRow = func() { - if runDML { - return - } - _, errDML = tk1.Exec("insert into t values (2, 1, 2);") - runDML = true - } - + var once sync.Once + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/ingest/afterMockWriterWriteRow", func() { + once.Do(func() { + _, errDML = tk1.Exec("insert into t values (2, 1, 2);") + }) + }) tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") require.NoError(t, errDML) - require.True(t, runDML) tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2")) } func TestMultiSchemaAddIndexMerge(t *testing.T) { store := testkit.CreateMockStore(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk2 := testkit.NewTestKit(t, store) @@ -381,7 +375,7 @@ func TestMultiSchemaAddIndexMerge(t *testing.T) { func TestAddIndexIngestJobWriteConflict(t *testing.T) { store := testkit.CreateMockStore(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t (a int primary key, b int);") @@ -414,7 +408,7 @@ func TestAddIndexIngestJobWriteConflict(t *testing.T) { func TestAddIndexIngestPartitionCheckpoint(t *testing.T) { store := testkit.CreateMockStore(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("set global tidb_enable_dist_task = off;") @@ -452,7 +446,7 @@ func TestAddIndexIngestPartitionCheckpoint(t *testing.T) { func TestAddGlobalIndexInIngest(t *testing.T) { store := testkit.CreateMockStore(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -507,7 +501,7 @@ func TestAddGlobalIndexInIngest(t *testing.T) { func TestAddGlobalIndexInIngestWithUpdate(t *testing.T) { store := testkit.CreateMockStore(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/pkg/ddl/ingest/main_test.go b/pkg/ddl/ingest/main_test.go index 8c6fe9dac60ca..bd895e58df2b3 100644 --- a/pkg/ddl/ingest/main_test.go +++ b/pkg/ddl/ingest/main_test.go @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ingest +package ingest_test import ( "testing" + "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" "go.uber.org/goleak" ) @@ -30,6 +31,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.Cleanup(testutil.CheckIngestLeakageForTest), } goleak.VerifyTestMain(m, opts...) } diff --git a/pkg/ddl/ingest/mem_root.go b/pkg/ddl/ingest/mem_root.go index 5fa69eda5a916..8607b1864c7c3 100644 --- a/pkg/ddl/ingest/mem_root.go +++ b/pkg/ddl/ingest/mem_root.go @@ -55,20 +55,18 @@ func init() { // memRootImpl is an implementation of MemRoot. type memRootImpl struct { - maxLimit int64 - currUsage int64 - structSize map[string]int64 - backendCtxMgr *litBackendCtxMgr - mu sync.RWMutex + maxLimit int64 + currUsage int64 + structSize map[string]int64 + mu sync.RWMutex } // NewMemRootImpl creates a new memRootImpl. -func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl { +func NewMemRootImpl(maxQuota int64) *memRootImpl { return &memRootImpl{ - maxLimit: maxQuota, - currUsage: 0, - structSize: make(map[string]int64, 10), - backendCtxMgr: bcCtxMgr, + maxLimit: maxQuota, + currUsage: 0, + structSize: make(map[string]int64, 10), } } @@ -142,6 +140,6 @@ func (m *memRootImpl) ReleaseWithTag(tag string) { } // RefreshConsumption implements MemRoot. -func (m *memRootImpl) RefreshConsumption() { - m.backendCtxMgr.UpdateMemoryUsage() +func (*memRootImpl) RefreshConsumption() { + // TODO(tagnenta): find a better solution that don't rely on backendCtxMgr. } diff --git a/pkg/ddl/ingest/mem_root_test.go b/pkg/ddl/ingest/mem_root_test.go index 70fc072b63b94..92a7d779316dd 100644 --- a/pkg/ddl/ingest/mem_root_test.go +++ b/pkg/ddl/ingest/mem_root_test.go @@ -22,7 +22,7 @@ import ( ) func TestMemoryRoot(t *testing.T) { - memRoot := ingest.MemRoot(ingest.NewMemRootImpl(1024, nil)) + memRoot := ingest.MemRoot(ingest.NewMemRootImpl(1024)) require.Equal(t, int64(1024), memRoot.MaxMemoryQuota()) require.Equal(t, int64(0), memRoot.CurrentUsage()) diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index b00547812064a..6774692cb01b9 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -27,80 +27,23 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend" "github.com/pingcap/tidb/pkg/lightning/backend/local" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/table" - sd "github.com/tikv/pd/client/servicediscovery" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) -// MockBackendCtxMgr is a mock backend context manager. -type MockBackendCtxMgr struct { - sessCtxProvider func() sessionctx.Context - runningJobs map[int64]*MockBackendCtx -} - -var _ BackendCtxMgr = (*MockBackendCtxMgr)(nil) - -// NewMockBackendCtxMgr creates a new mock backend context manager. -func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBackendCtxMgr { - return &MockBackendCtxMgr{ - sessCtxProvider: sessCtxProvider, - runningJobs: make(map[int64]*MockBackendCtx), - } -} - -// CheckMoreTasksAvailable implements BackendCtxMgr.CheckMoreTaskAvailable interface. -func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { - return len(m.runningJobs) == 0, nil -} - -// Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, - pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) { - logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) - if mockCtx, ok := m.runningJobs[jobID]; ok { - return mockCtx, nil - } - sessCtx := m.sessCtxProvider() +// NewMockBackendCtx creates a MockBackendCtx. +func NewMockBackendCtx(job *model.Job, sessCtx sessionctx.Context, cpMgr *CheckpointManager) BackendCtx { + logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", job.ID)) mockCtx := &MockBackendCtx{ - mu: sync.Mutex{}, - sessCtx: sessCtx, - jobID: jobID, - } - m.runningJobs[jobID] = mockCtx - return mockCtx, nil -} - -// Unregister implements BackendCtxMgr.Unregister interface. -func (m *MockBackendCtxMgr) Unregister(jobID int64) { - if mCtx, ok := m.runningJobs[jobID]; ok { - mCtx.sessCtx.StmtCommit(context.Background()) - err := mCtx.sessCtx.CommitTxn(context.Background()) - logutil.DDLIngestLogger().Info("mock backend mgr unregister", zap.Int64("jobID", jobID), zap.Error(err)) - delete(m.runningJobs, jobID) - } -} - -// EncodeJobSortPath implements BackendCtxMgr interface. -func (m *MockBackendCtxMgr) EncodeJobSortPath(int64) string { - return "" -} - -// Load implements BackendCtxMgr.Load interface. -func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool) { - logutil.DDLIngestLogger().Info("mock backend mgr load", zap.Int64("jobID", jobID)) - if mockCtx, ok := m.runningJobs[jobID]; ok { - return mockCtx, true - } - return nil, false -} - -// ResetSessCtx is only used for mocking test. -func (m *MockBackendCtxMgr) ResetSessCtx() { - for _, mockCtx := range m.runningJobs { - mockCtx.sessCtx = m.sessCtxProvider() + mu: sync.Mutex{}, + sessCtx: sessCtx, + jobID: job.ID, + checkpointMgr: cpMgr, } + return mockCtx } // MockBackendCtx is a mock backend context. @@ -118,12 +61,19 @@ func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ table.Table) ([] for range indexIDs { ret = append(ret, &MockEngineInfo{sessCtx: m.sessCtx, mu: &m.mu}) } + err := sessiontxn.NewTxn(context.Background(), m.sessCtx) + if err != nil { + return nil, err + } + m.sessCtx.GetSessionVars().SetInTxn(true) return ret, nil } // FinishAndUnregisterEngines implements BackendCtx interface. -func (*MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error { - logutil.DDLIngestLogger().Info("mock backend ctx unregister") +func (m *MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error { + m.sessCtx.StmtCommit(context.Background()) + err := m.sessCtx.CommitTxn(context.Background()) + logutil.DDLIngestLogger().Info("mock backend ctx unregister", zap.Error(err)) return nil } @@ -133,19 +83,73 @@ func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) return nil } -// Flush implements BackendCtx.Flush interface. -func (*MockBackendCtx) Flush(context.Context, FlushMode) (flushed, imported bool, err error) { - return false, false, nil +// IngestIfQuotaExceeded implements BackendCtx.IngestIfQuotaExceeded interface. +func (m *MockBackendCtx) IngestIfQuotaExceeded(_ context.Context, taskID, cnt int) error { + if m.checkpointMgr != nil { + m.checkpointMgr.UpdateWrittenKeys(taskID, cnt) + } + return nil +} + +// Ingest implements BackendCtx.Ingest interface. +func (m *MockBackendCtx) Ingest(_ context.Context) error { + if m.checkpointMgr != nil { + return m.checkpointMgr.AdvanceWatermark(true) + } + return nil +} + +// NextStartKey implements CheckpointOperator interface. +func (m *MockBackendCtx) NextStartKey() kv.Key { + if m.checkpointMgr != nil { + return m.checkpointMgr.NextKeyToProcess() + } + return nil +} + +// TotalKeyCount implements CheckpointOperator interface. +func (m *MockBackendCtx) TotalKeyCount() int { + if m.checkpointMgr != nil { + return m.checkpointMgr.TotalKeyCount() + } + return 0 +} + +// AddChunk implements CheckpointOperator interface. +func (m *MockBackendCtx) AddChunk(id int, endKey kv.Key) { + if m.checkpointMgr != nil { + m.checkpointMgr.Register(id, endKey) + } +} + +// UpdateChunk implements CheckpointOperator interface. +func (m *MockBackendCtx) UpdateChunk(id int, count int, done bool) { + if m.checkpointMgr != nil { + m.checkpointMgr.UpdateTotalKeys(id, count, done) + } } -// AttachCheckpointManager attaches a checkpoint manager to the backend context. -func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) { - m.checkpointMgr = mgr +// FinishChunk implements CheckpointOperator interface. +func (m *MockBackendCtx) FinishChunk(id int, count int) { + if m.checkpointMgr != nil { + m.checkpointMgr.UpdateWrittenKeys(id, count) + } } -// GetCheckpointManager returns the checkpoint manager attached to the backend context. -func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager { - return m.checkpointMgr +// GetImportTS implements CheckpointOperator interface. +func (m *MockBackendCtx) GetImportTS() uint64 { + if m.checkpointMgr != nil { + return m.checkpointMgr.GetTS() + } + return 0 +} + +// AdvanceWatermark implements CheckpointOperator interface. +func (m *MockBackendCtx) AdvanceWatermark(imported bool) error { + if m.checkpointMgr != nil { + return m.checkpointMgr.AdvanceWatermark(imported) + } + return nil } // GetLocalBackend returns the local backend. @@ -155,6 +159,17 @@ func (m *MockBackendCtx) GetLocalBackend() *local.Backend { return b } +// Close implements BackendCtx. +func (m *MockBackendCtx) Close() { + logutil.DDLIngestLogger().Info("mock backend context close", zap.Int64("jobID", m.jobID)) + BackendCounterForTest.Dec() +} + +// GetDiskUsage returns current disk usage of underlying backend. +func (bc *MockBackendCtx) GetDiskUsage() uint64 { + return 0 +} + // MockWriteHook the hook for write in mock engine. type MockWriteHook func(key, val []byte) @@ -225,6 +240,7 @@ func (m *MockWriter) WriteRow(_ context.Context, key, idxVal []byte, _ kv.Handle if MockExecAfterWriteRow != nil { MockExecAfterWriteRow() } + failpoint.InjectCall("afterMockWriterWriteRow") return nil } diff --git a/pkg/ddl/ingest/testutil/BUILD.bazel b/pkg/ddl/ingest/testutil/BUILD.bazel index 55c9c927aaa34..502de26217439 100644 --- a/pkg/ddl/ingest/testutil/BUILD.bazel +++ b/pkg/ddl/ingest/testutil/BUILD.bazel @@ -8,7 +8,8 @@ go_library( deps = [ "//pkg/ddl/ingest", "//pkg/kv", - "//pkg/sessionctx", + "//pkg/meta/model", "//pkg/testkit", + "//pkg/testkit/testfailpoint", ], ) diff --git a/pkg/ddl/ingest/testutil/testutil.go b/pkg/ddl/ingest/testutil/testutil.go index a95b40821003b..2e52b34d12074 100644 --- a/pkg/ddl/ingest/testutil/testutil.go +++ b/pkg/ddl/ingest/testutil/testutil.go @@ -15,29 +15,52 @@ package testutil import ( + "fmt" + "math" + "os" "testing" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" ) -// InjectMockBackendMgr mock LitBackCtxMgr. -func InjectMockBackendMgr(t *testing.T, store kv.Storage) (restore func()) { - tk := testkit.NewTestKit(t, store) - oldLitBackendMgr := ingest.LitBackCtxMgr - oldInitialized := ingest.LitInitialized +// InjectMockBackendCtx mock LitBackCtxMgr. +func InjectMockBackendCtx(t *testing.T, store kv.Storage) (restore func()) { + oldLitDiskRoot := ingest.LitDiskRoot + oldLitMemRoot := ingest.LitMemRoot - ingest.LitBackCtxMgr = ingest.NewMockBackendCtxMgr(func() sessionctx.Context { - tk.MustExec("rollback;") - tk.MustExec("begin;") - return tk.Session() - }) + tk := testkit.NewTestKit(t, store) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockNewBackendContext", + func(job *model.Job, cpMgr *ingest.CheckpointManager, mockBackendCtx *ingest.BackendCtx) { + *mockBackendCtx = ingest.NewMockBackendCtx(job, tk.Session(), cpMgr) + }) ingest.LitInitialized = true + ingest.LitDiskRoot = ingest.NewDiskRootImpl(t.TempDir()) + ingest.LitMemRoot = ingest.NewMemRootImpl(math.MaxInt64) return func() { - ingest.LitBackCtxMgr = oldLitBackendMgr - ingest.LitInitialized = oldInitialized + ingest.LitInitialized = false + ingest.LitDiskRoot = oldLitDiskRoot + ingest.LitMemRoot = oldLitMemRoot + } +} + +// CheckIngestLeakageForTest is only used in test. +func CheckIngestLeakageForTest(exitCode int) { + if exitCode == 0 { + leakObj := "" + if ingest.TrackerCountForTest.Load() != 0 { + leakObj = "disk usage tracker" + } else if ingest.BackendCounterForTest.Load() != 0 { + leakObj = "backend context" + } + if len(leakObj) > 0 { + fmt.Fprintf(os.Stderr, "add index leakage check failed: %s leak\n", leakObj) + os.Exit(1) + } } + os.Exit(exitCode) } diff --git a/pkg/ddl/main_test.go b/pkg/ddl/main_test.go index e741f53bf78a6..b0875732f2dfd 100644 --- a/pkg/ddl/main_test.go +++ b/pkg/ddl/main_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/keyspace" @@ -68,6 +69,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.Cleanup(testutil.CheckIngestLeakageForTest), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 2ed1ac5811efe..c8efcd8342764 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -612,7 +612,7 @@ func (r *reorgInfo) NewJobContext() *ReorgContext { func (r *reorgInfo) String() string { var isEnabled bool if ingest.LitInitialized { - _, isEnabled = ingest.LitBackCtxMgr.Load(r.Job.ID) + isEnabled = r.ReorgMeta != nil && r.ReorgMeta.IsFastReorg } return "CurrElementType:" + string(r.currElement.TypeKey) + "," + "CurrElementID:" + strconv.FormatInt(r.currElement.ID, 10) + "," + diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 9bb45a2a1239f..25ce4dd8ce539 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" @@ -94,20 +93,12 @@ func convertAddIdxJob2RollbackJob( job.State = model.JobStateRollingback // TODO(tangenta): get duplicate column and match index. err = completeErr(err, allIndexInfos[0]) - if ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.Unregister(job.ID) - } return ver, errors.Trace(err) } // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, job *model.Job, occuredErr error) (ver int64, err error) { - defer func() { - if ingest.LitBackCtxMgr != nil { - ingest.LitBackCtxMgr.Unregister(job.ID) - } - }() schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, schemaID) if err != nil { diff --git a/pkg/ddl/tests/metadatalock/mdl_test.go b/pkg/ddl/tests/metadatalock/mdl_test.go index 5a041e3b16bba..f60fc61a096f0 100644 --- a/pkg/ddl/tests/metadatalock/mdl_test.go +++ b/pkg/ddl/tests/metadatalock/mdl_test.go @@ -902,7 +902,7 @@ func TestMDLPreparePlanCacheInvalid(t *testing.T) { func TestMDLPreparePlanCacheExecute(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() sv := server.CreateMockServer(t, store) @@ -958,7 +958,7 @@ func TestMDLPreparePlanCacheExecute(t *testing.T) { func TestMDLPreparePlanCacheExecute2(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() sv := server.CreateMockServer(t, store) @@ -1004,7 +1004,7 @@ func TestMDLPreparePlanCacheExecute2(t *testing.T) { // TestMDLPreparePlanCacheExecuteInsert makes sure the insert statement handle the schema correctly in plan cache. func TestMDLPreparePlanCacheExecuteInsert(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - defer ingesttestutil.InjectMockBackendMgr(t, store)() + defer ingesttestutil.InjectMockBackendCtx(t, store)() sv := server.CreateMockServer(t, store) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 28b824c39bd3d..5d5d87db9f110 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -57,6 +57,7 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/engine" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" @@ -1605,21 +1606,31 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err } // ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is -// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import +// invalid. Caller must use SetTSBeforeImportEngine to set a valid TS before import // the engine. func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error { return local.engineMgr.resetEngine(ctx, engineUUID, true) } -// SetTSAfterResetEngine allocates a new TS for the engine after it's reset. +// SetTSBeforeImportEngine allocates a new TS for the engine before it is imported. // This is typically called after persisting the chosen TS of the engine to make // sure TS is not changed after task failover. -func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error { +func (local *Backend) SetTSBeforeImportEngine(ctx context.Context, engineUUID uuid.UUID, ts uint64) error { e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose) if e == nil { - return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String()) + return errors.Errorf("engine %s not found in SetTSBeforeImportEngine", engineUUID.String()) } defer e.unlock() + if ts == 0 { + p, l, err := local.pdCli.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + failpoint.Inject("afterSetTSBeforeImportEngine", func(_ failpoint.Value) { + failpoint.Return(errors.Errorf("mock err")) + }) + ts = oracle.ComposeTS(p, l) + } e.engineMeta.TS = ts return e.saveEngineMeta() } diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 67b0e83b42378..24b8ac1643f9a 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -1184,6 +1184,9 @@ func (b *storeBalancer) runSendToWorker(workerCtx context.Context) { select { case <-workerCtx.Done(): j.done(b.jobWg) + if j.region != nil && j.region.Region != nil { + b.releaseStoreLoad(j.region.Region.Peers) + } return case b.innerJobToWorkerCh <- j: } diff --git a/tests/realtikvtest/BUILD.bazel b/tests/realtikvtest/BUILD.bazel index 6d7f294c9fd8f..cf7046233daba 100644 --- a/tests/realtikvtest/BUILD.bazel +++ b/tests/realtikvtest/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/config", "//pkg/ddl", + "//pkg/ddl/ingest/testutil", "//pkg/domain", "//pkg/kv", "//pkg/session", diff --git a/tests/realtikvtest/addindextest3/BUILD.bazel b/tests/realtikvtest/addindextest3/BUILD.bazel index 6af6262846d00..df54d061be734 100644 --- a/tests/realtikvtest/addindextest3/BUILD.bazel +++ b/tests/realtikvtest/addindextest3/BUILD.bazel @@ -36,7 +36,6 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", ], ) diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index ab1295499fd58..788a1553968f3 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/util" ) @@ -86,8 +85,19 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) - discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0, 0) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t (a int);") + var realJob *model.Job + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterWaitSchemaSynced", func(job *model.Job) { + if job.Type == model.ActionAddIndex { + realJob = job.Clone() + } + }) + tk.MustExec("alter table t add index idx(a);") + require.NotNil(t, realJob) + + bCtx, err := ingest.NewBackendCtxBuilder(context.Background(), store, realJob).Build() require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs)) @@ -107,10 +117,11 @@ func TestBackendCtxConcurrentUnregister(t *testing.T) { }() } wg.Wait() - ingest.LitBackCtxMgr.Unregister(1) + bCtx.Close() } func TestMockMemoryUsedUp(t *testing.T) { + t.Skip("TODO(tangenta): support memory tracking later") testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/setMemTotalInMB", "return(100)") store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -253,6 +264,7 @@ func TestAddIndexPresplitFunctional(t *testing.T) { "Split index region num exceeded the limit 1000") testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockSplitIndexRegionAndWaitErr", "2*return") tk.MustExec("alter table t add index idx(b) pre_split_regions = (between (0) and (10 * 10000) regions 3);") + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/mockSplitIndexRegionAndWaitErr") tk.MustExec("drop table t;") tk.MustExec("create table t (a bigint primary key, b int);") diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 7c4efd1fa3575..cb1433ace2380 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -453,12 +453,12 @@ func TestAddIndexDiskQuotaTS(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("set @@global.tidb_enable_dist_task = 0;") - testAddIndexDiskQuotaTS(t, tk) + testAddIndexDiskQuotaTS(tk) tk.MustExec("set @@global.tidb_enable_dist_task = 1;") - testAddIndexDiskQuotaTS(t, tk) + testAddIndexDiskQuotaTS(tk) } -func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) { +func testAddIndexDiskQuotaTS(tk *testkit.TestKit) { tk.MustExec("drop database if exists addindexlit;") tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") @@ -476,6 +476,41 @@ func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) { tk.MustExec("update t set b = b + 1;") } +func TestAddIndexAdvanceWatermarkFailed(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") + tk.MustExec("set @@global.tidb_enable_dist_task = 0;") + + tk.MustExec("create table t(id int primary key, b int, k int);") + tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) + tk.MustExec("insert into t values(1, 1, 1);") + tk.MustExec("insert into t values(100000, 1, 2);") + ingest.ForceSyncFlagForTest = true + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "2*return") + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + tk.MustExec("update t set b = b + 1;") + + //// TODO(tangenta): add scan ts, import ts and key range to the checkpoint information, so that + //// we can re-ingest the same task idempotently. + // tk.MustExec("alter table t drop index idx;") + // testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "2*return") + // tk.MustExec("alter table t add unique index idx(k);") + // tk.MustExec("admin check table t;") + // tk.MustExec("update t set k = k + 10;") + + tk.MustExec("alter table t drop index idx;") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "2*return") + tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrDupEntry) + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockAfterImportAllocTSFailed", "1*return") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/lightning/backend/local/afterSetTSBeforeImportEngine", "1*return") + tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrDupEntry) +} + func TestAddIndexRemoteDuplicateCheck(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -496,6 +531,19 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) { ingest.ForceSyncFlagForTest = false } +func TestAddIndexRecoverOnDuplicateCheck(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = on;") + tk.MustExec("set @@global.tidb_enable_dist_task = on;") + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1), (2), (3);") + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/mockCollectRemoteDuplicateRowsFailed", "1*return") + tk.MustExec("alter table t add unique index idx(a);") +} + func TestAddIndexBackfillLostUpdate(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -710,6 +758,9 @@ func TestIssue55808(t *testing.T) { tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) tk.MustExec("set global tidb_enable_dist_task = off;") tk.MustExec("set global tidb_ddl_error_count_limit = 0") + defer func() { + tk.MustExec("set global tidb_ddl_error_count_limit = default;") + }() backup := local.MaxWriteAndIngestRetryTimes local.MaxWriteAndIngestRetryTimes = 1 diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 9ff1b034549bb..8ffcfef186243 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" @@ -49,9 +50,24 @@ func init() { }) } +func getRealAddIndexJob(t *testing.T, tk *testkit.TestKit) *model.Job { + tk.MustExec("use test;") + tk.MustExec("create table t (a int);") + var realJob *model.Job + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterWaitSchemaSynced", func(job *model.Job) { + if job.Type == model.ActionAddIndex { + realJob = job.Clone() + } + }) + tk.MustExec("alter table t add index idx(a);") + require.NotNil(t, realJob) + return realJob +} + func TestBackfillOperators(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) + realJob := getRealAddIndexJob(t, tk) regionCnt := 10 tbl, idxInfo, startKey, endKey, copCtx := prepare(t, tk, dom, regionCnt) sessPool := newSessPoolForTest(t, store) @@ -98,7 +114,7 @@ func TestBackfillOperators(t *testing.T) { ctx := context.Background() opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) src := testutil.NewOperatorTestSource(opTasks...) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, 0, nil, nil) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) @@ -143,22 +159,24 @@ func TestBackfillOperators(t *testing.T) { } pTbl := tbl.(table.PhysicalTable) index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) - mockBackendCtx := &ingest.MockBackendCtx{} + bcCtx, err := ingest.NewBackendCtxBuilder(ctx, store, realJob).Build() + require.NoError(t, err) + defer bcCtx.Close() mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(onWrite) src := testutil.NewOperatorTestSource(chunkResults...) reorgMeta := ddl.NewDDLReorgMeta(tk.Session()) ingestOp := ddl.NewIndexIngestOperator( - opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, - srcChkPool, 3, reorgMeta, nil, &ddl.EmptyRowCntListener{}) + opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, + srcChkPool, 3, reorgMeta, &ddl.EmptyRowCntListener{}) sink := testutil.NewOperatorTestSink[ddl.IndexWriteResult]() operator.Compose[ddl.IndexRecordChunk](src, ingestOp) operator.Compose[ddl.IndexWriteResult](ingestOp, sink) pipeline := operator.NewAsyncPipeline(src, ingestOp, sink) - err := pipeline.Execute() + err = pipeline.Execute() require.NoError(t, err) err = pipeline.Close() require.NoError(t, err) @@ -180,6 +198,7 @@ func TestBackfillOperators(t *testing.T) { func TestBackfillOperatorPipeline(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) + realJob := getRealAddIndexJob(t, tk) regionCnt := 10 tbl, idxInfo, startKey, endKey, _ := prepare(t, tk, dom, regionCnt) sessPool := newSessPoolForTest(t, store) @@ -187,14 +206,16 @@ func TestBackfillOperatorPipeline(t *testing.T) { ctx := context.Background() opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) defer cancel() - mockBackendCtx := &ingest.MockBackendCtx{} + bcCtx, err := ingest.NewBackendCtxBuilder(ctx, store, realJob).Build() + require.NoError(t, err) + defer bcCtx.Close() mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(func(key, val []byte) {}) pipeline, err := ddl.NewAddIndexIngestPipeline( opCtx, store, sessPool, - mockBackendCtx, + bcCtx, []ingest.Engine{mockEngine}, 1, // job id tbl.(table.PhysicalTable), @@ -204,7 +225,6 @@ func TestBackfillOperatorPipeline(t *testing.T) { ddl.NewDDLReorgMeta(tk.Session()), 0, 2, - nil, &ddl.EmptyRowCntListener{}, ) require.NoError(t, err) @@ -219,10 +239,13 @@ func TestBackfillOperatorPipeline(t *testing.T) { func TestBackfillOperatorPipelineException(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) + realJob := getRealAddIndexJob(t, tk) regionCnt := 10 tbl, idxInfo, startKey, endKey, _ := prepare(t, tk, dom, regionCnt) sessPool := newSessPoolForTest(t, store) - mockBackendCtx := &ingest.MockBackendCtx{} + bcCtx, err := ingest.NewBackendCtxBuilder(context.Background(), store, realJob).Build() + require.NoError(t, err) + defer bcCtx.Close() mockEngine := ingest.NewMockEngineInfo(nil) mockEngine.SetHook(func(_, _ []byte) {}) @@ -289,7 +312,7 @@ func TestBackfillOperatorPipelineException(t *testing.T) { pipeline, err := ddl.NewAddIndexIngestPipeline( opCtx, store, sessPool, - mockBackendCtx, + bcCtx, []ingest.Engine{mockEngine}, 1, // job id tbl.(table.PhysicalTable), @@ -299,7 +322,6 @@ func TestBackfillOperatorPipelineException(t *testing.T) { ddl.NewDDLReorgMeta(tk.Session()), 0, 2, - nil, &ddl.EmptyRowCntListener{}, ) require.NoError(t, err) @@ -378,6 +400,7 @@ func (p *sessPoolForTest) Put(sctx sessionctx.Context) { func TestTuneWorkerPoolSize(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) + realJob := getRealAddIndexJob(t, tk) tbl, idxInfo, _, _, copCtx := prepare(t, tk, dom, 10) sessPool := newSessPoolForTest(t, store) @@ -385,7 +408,7 @@ func TestTuneWorkerPoolSize(t *testing.T) { { ctx := context.Background() opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, nil, 2, nil, 0, nil) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, nil, 2, 0, nil, nil) scanOp.Open() require.Equal(t, scanOp.GetWorkerPoolSize(), int32(2)) @@ -404,11 +427,13 @@ func TestTuneWorkerPoolSize(t *testing.T) { opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) pTbl := tbl.(table.PhysicalTable) index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) - mockBackendCtx := &ingest.MockBackendCtx{} + bcCtx, err := ingest.NewBackendCtxBuilder(context.Background(), store, realJob).Build() + require.NoError(t, err) + defer bcCtx.Close() mockEngine := ingest.NewMockEngineInfo(nil) - ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, + ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, nil, 2, nil, - nil, &ddl.EmptyRowCntListener{}) + &ddl.EmptyRowCntListener{}) ingestOp.Open() require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(2)) diff --git a/tests/realtikvtest/testkit.go b/tests/realtikvtest/testkit.go index 02940ef6d900a..802205e94795e 100644 --- a/tests/realtikvtest/testkit.go +++ b/tests/realtikvtest/testkit.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/session" @@ -93,6 +94,7 @@ func RunTestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), // the resolveFlushedLocks goroutine runs in the background to commit or rollback locks. goleak.IgnoreAnyFunction("github.com/tikv/client-go/v2/txnkv/transaction.(*twoPhaseCommitter).resolveFlushedLocks.func1"), + goleak.Cleanup(testutil.CheckIngestLeakageForTest), } callback := func(i int) int { // wait for MVCCLevelDB to close, MVCCLevelDB will be closed in one second