diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 8dbc32851acc2..a3cf89b3e0c2b 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -360,7 +361,7 @@ func writeExternalOneFile(s *writeTestSuite) { } writer := builder.BuildOneFile( s.store, filePath, "writerID") - _ = writer.Init(ctx, 20*1024*1024) + _ = writer.Init(ctx, 20*1024*1024, 20) key, val, _ := s.source.next() for key != nil { err := writer.WriteRow(ctx, key, val) @@ -537,7 +538,8 @@ func readFileConcurrently(t *testing.T, s *readTestSuite) { file := files[i] eg.Go(func() error { buf := make([]byte, s.memoryLimit/conc) - reader, err := s.store.Open(ctx, file, nil) + st, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil) + reader, err := st.Open(ctx, file, nil) intest.AssertNoError(err) var size int for { @@ -721,7 +723,7 @@ func createAscendingFiles( store storage.ExternalStorage, fileSize, fileCount int, subDir string, -) int { +) (int, kv.Key, kv.Key) { ctx := context.Background() cleanOldFiles(ctx, store, "/"+subDir) @@ -729,6 +731,7 @@ func createAscendingFiles( keyIdx := 0 value := make([]byte, 100) kvCnt := 0 + var minKey, maxKey kv.Key for i := 0; i < fileCount; i++ { builder := NewWriterBuilder(). SetMemorySizeLimit(uint64(float64(fileSize) * 1.1)) @@ -739,18 +742,26 @@ func createAscendingFiles( ) totalSize := 0 + + var key string for totalSize < fileSize { - key := fmt.Sprintf("key_%09d", keyIdx) + key = fmt.Sprintf("key_%09d", keyIdx) + if i == 0 && totalSize == 0 { + minKey = []byte(key) + } err := writer.WriteRow(ctx, []byte(key), value, nil) intest.AssertNoError(err) keyIdx++ totalSize += len(key) + len(value) kvCnt++ } + if i == fileCount-1 { + maxKey = []byte(key) + } err := writer.Close(ctx) intest.AssertNoError(err) } - return kvCnt + return kvCnt, minKey, maxKey } var ( @@ -785,7 +796,7 @@ func testCompareReaderAscendingContent(t *testing.T, fn func(t *testing.T, suite store := openTestingStorage(t) kvCnt := 0 if !*skipCreate { - kvCnt = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix) + kvCnt, _, _ = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix) } fileIdx := 0 var ( @@ -896,3 +907,200 @@ func TestPrepareLargeData(t *testing.T) { t.Logf("total %d data files, first file size: %.2f MB, last file size: %.2f MB", len(dataFiles), float64(firstFileSize)/1024/1024, float64(lastFileSize)/1024/1024) } + +type mergeTestSuite struct { + store storage.ExternalStorage + subDir string + totalKVCnt int + concurrency int + memoryLimit int + mergeIterHotspot bool + minKey kv.Key + maxKey kv.Key + beforeMerge func() + afterMerge func() +} + +func newMergeStep(t *testing.T, s *mergeTestSuite) { + ctx := context.Background() + datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir) + intest.AssertNoError(err) + + mergeOutput := "merge_output" + totalSize := atomic.NewUint64(0) + onClose := func(s *WriterSummary) { + totalSize.Add(s.TotalSize) + } + if s.beforeMerge != nil { + s.beforeMerge() + } + + now := time.Now() + err = MergeOverlappingFiles( + ctx, + datas, + stats, + s.store, + s.minKey, kv.Key(s.maxKey).Next(), + int64(5*size.MB), + mergeOutput, + "mergeID", + DefaultBlockSize, + 8*1024, + 1*size.MB, + 8*1024, + onClose, + s.concurrency, + s.mergeIterHotspot, + ) + + intest.AssertNoError(err) + if s.afterMerge != nil { + s.afterMerge() + } + elapsed := time.Since(now) + t.Logf( + "merge speed for %d bytes in %s, speed: %.2f MB/s", + totalSize.Load(), + elapsed, + float64(totalSize.Load())/elapsed.Seconds()/1024/1024, + ) +} + +func newMergeStepOpt(t *testing.T, s *mergeTestSuite) { + ctx := context.Background() + datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir) + intest.AssertNoError(err) + + mergeOutput := "merge_output" + totalSize := atomic.NewUint64(0) + onClose := func(s *WriterSummary) { + totalSize.Add(s.TotalSize) + } + if s.beforeMerge != nil { + s.beforeMerge() + } + + now := time.Now() + err = MergeOverlappingFilesOpt( + ctx, + datas, + stats, + s.store, + s.minKey, kv.Key(s.maxKey).Next(), + int64(5*size.MB), + mergeOutput, + "mergeID", + DefaultBlockSize, + 8*1024, + 1*size.MB, + 8*1024, + onClose, + s.concurrency, + s.mergeIterHotspot, + ) + + intest.AssertNoError(err) + if s.afterMerge != nil { + s.afterMerge() + } + elapsed := time.Since(now) + t.Logf( + "merge speed with %d concurrency for %d bytes in %s, speed: %.2f MB/s", + s.concurrency, + totalSize.Load(), + elapsed, + float64(totalSize.Load())/elapsed.Seconds()/1024/1024, + ) +} + +func oldMergeStep(t *testing.T, s *mergeTestSuite) { + ctx := context.Background() + datas, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir) + intest.AssertNoError(err) + + if s.beforeMerge != nil { + s.beforeMerge() + } + mergeOutput := "merge_output" + totalSize := atomic.NewUint64(0) + onClose := func(s *WriterSummary) { + totalSize.Add(s.TotalSize) + } + now := time.Now() + err = MergeOverlappingFilesV2( + ctx, + datas, + s.store, + 16*1024*1024, + 64*1024, + mergeOutput, + DefaultBlockSize, + 8*1024, + 1024*1024, + 8*1024, + onClose, + s.concurrency, + s.mergeIterHotspot) + intest.AssertNoError(err) + + if s.afterMerge != nil { + s.afterMerge() + } + elapsed := time.Since(now) + t.Logf("merge prev implementation with %d concurrency %d bytes in %s, speed: %.2f MB/s", + s.concurrency, totalSize.Load(), elapsed, float64(totalSize.Load())/elapsed.Seconds()/1024/1024) +} + +func testCompareMergeAscendingContent(t *testing.T, fn func(t *testing.T, suite *mergeTestSuite)) { + store := openTestingStorage(t) + kvCnt := 0 + var minKey, maxKey kv.Key + if !*skipCreate { + kvCnt, minKey, maxKey = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix) + } + + fileIdx := 0 + var ( + file *os.File + err error + ) + beforeTest := func() { + fileIdx++ + file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx)) + intest.AssertNoError(err) + err = pprof.StartCPUProfile(file) + intest.AssertNoError(err) + } + + afterTest := func() { + pprof.StopCPUProfile() + } + + suite := &mergeTestSuite{ + store: store, + totalKVCnt: kvCnt, + concurrency: *concurrency, + memoryLimit: *memoryLimit, + beforeMerge: beforeTest, + afterMerge: afterTest, + subDir: *objectPrefix, + minKey: minKey, + maxKey: maxKey, + mergeIterHotspot: true, + } + + fn(t, suite) +} + +func TestNewMerge(t *testing.T) { + testCompareMergeAscendingContent(t, newMergeStep) +} + +func TestOldMerge(t *testing.T) { + testCompareMergeAscendingContent(t, oldMergeStep) +} + +func TestNewMergeOpt(t *testing.T) { + testCompareMergeAscendingContent(t, newMergeStepOpt) +} diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index 711a3f7defb5c..9d2a6edbbb913 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -271,7 +271,7 @@ func (r *byteReader) reload() error { now := r.concurrentReader.now // in read only false -> true is possible if !now && to { - r.logger.Info("switch reader mode", zap.Bool("use concurrent mode", true)) + // r.logger.Info("switch reader mode", zap.Bool("use concurrent mode", true)) err := r.switchToConcurrentReader() if err != nil { return err @@ -311,10 +311,10 @@ func (r *byteReader) reload() error { } func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int) { - r.logger.Info("drop data in closeConcurrentReader", - zap.Int("reloadCnt", r.concurrentReader.reloadCnt), - zap.Int("dropBytes", len(r.curBuf)-r.curBufOffset), - ) + // r.logger.Info("drop data in closeConcurrentReader", + // zap.Int("reloadCnt", r.concurrentReader.reloadCnt), + // zap.Int("dropBytes", len(r.curBuf)-r.curBufOffset), + // ) r.concurrentReader.largeBufferPool.Destroy() r.concurrentReader.largeBuf = nil r.concurrentReader.now = false diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 9734393a51ee1..dcf9750b4cb57 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -798,5 +798,4 @@ func (m *MemoryIngestData) DecRef() { func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) { m.importedKVSize.Add(totalBytes) m.importedKVCount.Add(totalCount) - } diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index 3a931d55a0000..46bf29ceda130 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -1,21 +1,354 @@ package external import ( + "bytes" "context" + "fmt" + "math" + "time" "github.com/google/uuid" + "github.com/jfcg/sorty/v2" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/membuf" + "golang.org/x/sync/errgroup" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/kv" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) // MergeOverlappingFiles reads from given files whose key range may overlap // and writes to new sorted, nonoverlapping files. func MergeOverlappingFiles( + ctx context.Context, + dataFiles []string, + statFiles []string, + store storage.ExternalStorage, + startKey []byte, + endKey []byte, + partSize int64, + newFilePrefix string, + writerID string, + blockSize int, + writeBatchCount uint64, + propSizeDist uint64, + propKeysDist uint64, + onClose OnCloseFunc, + concurrency int, + checkHotspot bool, +) error { + logutil.Logger(ctx).Info("enter MergeOverlappingFiles", + zap.Int("data-file-count", len(dataFiles)), + zap.Int("stat-file-count", len(statFiles)), + zap.Binary("start-key", startKey), + zap.Binary("end-key", endKey), + zap.String("new-file-prefix", newFilePrefix), + zap.Int("concurrency", concurrency), + ) + + splitter, err := NewRangeSplitter( + ctx, + dataFiles, + statFiles, + store, + 4*1024*1024*1024, + math.MaxInt64, + 4*1024*1024*1024, + math.MaxInt64, + checkHotspot, + ) + if err != nil { + return err + } + + writer := NewWriterBuilder(). + SetMemorySizeLimit(DefaultMemSizeLimit). + SetBlockSize(blockSize). + SetPropKeysDistance(propKeysDist). + SetPropSizeDistance(propSizeDist). + SetOnCloseFunc(onClose). + BuildOneFile( + store, + newFilePrefix, + writerID) + err = writer.Init(ctx, partSize, 40) + if err != nil { + return nil + } + + bufPool := membuf.NewPool() + loaded := &memKVsAndBuffers{} + curStart := startKey + + for { + endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() + if err != nil { + return err + } + curEnd := endKeyOfGroup + if len(endKeyOfGroup) == 0 { + curEnd = endKey + } + now := time.Now() + err = readAllData( + ctx, + store, + dataFilesOfGroup, + statFilesOfGroup, + curStart, + curEnd, + bufPool, + loaded, + ) + if err != nil { + return err + } + logutil.Logger(ctx).Info("reading external storage in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now))) + now = time.Now() + sorty.MaxGor = uint64(concurrency) + sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { + if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > + if r != s { + loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] + loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] + } + return true + } + return false + }) + logutil.Logger(ctx).Info("sorting in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now)), + zap.Any("key len", len(loaded.keys))) + now = time.Now() + for i, key := range loaded.keys { + err = writer.WriteRow(ctx, key, loaded.values[i]) + if err != nil { + return err + } + } + logutil.Logger(ctx).Info("writing in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now)), + zap.Any("key len", len(loaded.keys))) + curStart = curEnd + if len(endKeyOfGroup) == 0 { + break + } + } + + var stat MultipleFilesStat + stat.Filenames = append(stat.Filenames, + [2]string{writer.dataFile, writer.statFile}) + stat.build([]tidbkv.Key{startKey}, []tidbkv.Key{loaded.keys[len(loaded.keys)-1]}) + if onClose != nil { + onClose(&WriterSummary{ + WriterID: writer.writerID, + Seq: 0, + Min: startKey, + Max: loaded.keys[len(loaded.keys)-1], + TotalSize: writer.totalSize, + MultipleFilesStats: []MultipleFilesStat{stat}, + }) + } + err = writer.Close(ctx) + if err != nil { + return err + } + return nil +} + +type readerGroup struct { + dataFiles []string + statFiles []string + startKey kv.Key + endKey kv.Key +} + +func getGroups(ctx context.Context, splitter *RangeSplitter, startKey kv.Key, endKey kv.Key) ([]readerGroup, error) { + readerGroups := make([]readerGroup, 0, 10) + curStart := startKey + + for { + endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup() + if err != nil { + return nil, err + } + curEnd := endKeyOfGroup + if len(endKeyOfGroup) == 0 { + curEnd = endKey + } + readerGroups = append(readerGroups, readerGroup{ + dataFiles: dataFilesOfGroup, + statFiles: statFilesOfGroup, + startKey: curStart, + endKey: curEnd, + }) + + curStart = curEnd + if len(endKeyOfGroup) == 0 { + break + } + } + return readerGroups, nil +} + +func MergeOverlappingFilesOpt( + ctx context.Context, + dataFiles []string, + statFiles []string, + store storage.ExternalStorage, + startKey []byte, + endKey []byte, + partSize int64, + newFilePrefix string, + writerID string, + blockSize int, + writeBatchCount uint64, + propSizeDist uint64, + propKeysDist uint64, + onClose OnCloseFunc, + concurrency int, + checkHotspot bool, +) error { + logutil.Logger(ctx).Info("enter MergeOverlappingFiles opt", + zap.Int("data-file-count", len(dataFiles)), + zap.Int("stat-file-count", len(statFiles)), + zap.Binary("start-key", startKey), + zap.Binary("end-key", endKey), + zap.String("new-file-prefix", newFilePrefix), + zap.Int("concurrency", concurrency), + ) + + splitter, err := NewRangeSplitter( + ctx, + dataFiles, + statFiles, + store, + 1*1024*1024*1024, + math.MaxInt64, + 4*1024*1024*1024, + math.MaxInt64, + checkHotspot, + ) + if err != nil { + return err + } + + groups, err := getGroups(ctx, splitter, startKey, endKey) + if err != nil { + return err + } + logutil.Logger(ctx).Info("get groups", zap.Int("len", len(groups))) + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(concurrency) + partSize = max(int64(5*size.MB), partSize+int64(1*size.MB)) + for i, group := range groups { + group := group + i := i + eg.Go(func() error { + return runOneGroup(egCtx, store, group, partSize, newFilePrefix, fmt.Sprintf("%s%d", writerID, i), blockSize, propSizeDist, propKeysDist, onClose) + }) + } + return eg.Wait() +} + +func runOneGroup( + ctx context.Context, + store storage.ExternalStorage, + rdGroup readerGroup, + partSize int64, + newFilePrefix string, + wrireID string, + blockSize int, + propSizeDist uint64, + propKeysDist uint64, + onClose OnCloseFunc) error { + logutil.BgLogger().Info("data files", zap.Int("len", len(rdGroup.dataFiles))) + writer := NewWriterBuilder(). + SetMemorySizeLimit(DefaultMemSizeLimit*2). + SetBlockSize(blockSize). + SetPropKeysDistance(propKeysDist). + SetPropSizeDistance(propSizeDist). + SetOnCloseFunc(onClose). + BuildOneFile(store, newFilePrefix, wrireID) + + err := writer.Init(ctx, partSize, 40) + if err != nil { + return nil + } + + bufPool := membuf.NewPool() + loaded := &memKVsAndBuffers{} + now := time.Now() + err = readAllData( + ctx, + store, + rdGroup.dataFiles, + rdGroup.statFiles, + rdGroup.startKey, + rdGroup.endKey, + bufPool, + loaded, + ) + if err != nil { + return err + } + logutil.Logger(ctx).Info("reading external storage in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now))) + now = time.Now() + sorty.MaxGor = uint64(8) + sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool { + if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or > + if r != s { + loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r] + loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r] + } + return true + } + return false + }) + logutil.Logger(ctx).Info("sorting in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now)), + zap.Any("key len", len(loaded.keys))) + now = time.Now() + for i, key := range loaded.keys { + err = writer.WriteRow(ctx, key, loaded.values[i]) + if err != nil { + return err + } + } + logutil.Logger(ctx).Info("writing in MergeOverlappingFiles", + zap.Duration("cost time", time.Since(now)), + zap.Any("key len", len(loaded.keys))) + + var stat MultipleFilesStat + stat.Filenames = append(stat.Filenames, + [2]string{writer.dataFile, writer.statFile}) + stat.build([]tidbkv.Key{rdGroup.startKey}, []tidbkv.Key{loaded.keys[len(loaded.keys)-1]}) + if onClose != nil { + onClose(&WriterSummary{ + WriterID: writer.writerID, + Seq: 0, + Min: rdGroup.startKey, + Max: loaded.keys[len(loaded.keys)-1], + TotalSize: writer.totalSize, + MultipleFilesStats: []MultipleFilesStat{stat}, + }) + } + err = writer.Close(ctx) + if err != nil { + return err + } + return nil +} + +// MergeOverlappingFilesV2 reads from given files whose key range may overlap +// and writes to new sorted, nonoverlapping files. +func MergeOverlappingFilesV2( ctx context.Context, paths []string, store storage.ExternalStorage, @@ -47,6 +380,7 @@ func MergeOverlappingFiles( zap.Int("file-count", len(paths)), zap.Int("file-groups", len(dataFilesSlice)), zap.Int("concurrency", concurrency)) + eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(concurrency) partSize = max(int64(5*size.MB), partSize+int64(1*size.MB)) @@ -74,65 +408,6 @@ func MergeOverlappingFiles( return eg.Wait() } -// unused for now. -func mergeOverlappingFilesImpl(ctx context.Context, - paths []string, - store storage.ExternalStorage, - readBufferSize int, - newFilePrefix string, - writerID string, - memSizeLimit uint64, - blockSize int, - writeBatchCount uint64, - propSizeDist uint64, - propKeysDist uint64, - onClose OnCloseFunc, - checkHotspot bool, -) (err error) { - task := log.BeginTask(logutil.Logger(ctx).With( - zap.String("writer-id", writerID), - zap.Int("file-count", len(paths)), - ), "merge overlapping files") - defer func() { - task.End(zap.ErrorLevel, err) - }() - - zeroOffsets := make([]uint64, len(paths)) - iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0) - if err != nil { - return err - } - defer func() { - err := iter.Close() - if err != nil { - logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err)) - } - }() - - writer := NewWriterBuilder(). - SetMemorySizeLimit(memSizeLimit). - SetBlockSize(blockSize). - SetOnCloseFunc(onClose). - SetWriterBatchCount(writeBatchCount). - SetPropSizeDistance(propSizeDist). - SetPropKeysDistance(propKeysDist). - Build(store, newFilePrefix, writerID) - - // currently use same goroutine to do read and write. The main advantage is - // there's no KV copy and iter can reuse the buffer. - for iter.Next() { - err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil) - if err != nil { - return err - } - } - err = iter.Error() - if err != nil { - return err - } - return writer.Close(ctx) -} - // mergeOverlappingFilesV2 reads from given files whose key range may overlap // and writes to one new sorted, nonoverlapping files. func mergeOverlappingFilesV2( @@ -179,7 +454,7 @@ func mergeOverlappingFilesV2( SetPropSizeDistance(propSizeDist). SetOnCloseFunc(onClose). BuildOneFile(store, newFilePrefix, writerID) - err = writer.Init(ctx, partSize) + err = writer.Init(ctx, partSize, 20) if err != nil { return nil } diff --git a/br/pkg/lightning/backend/external/onefile_writer.go b/br/pkg/lightning/backend/external/onefile_writer.go index 996f336909fbc..9cddb06a3c38d 100644 --- a/br/pkg/lightning/backend/external/onefile_writer.go +++ b/br/pkg/lightning/backend/external/onefile_writer.go @@ -20,7 +20,6 @@ import ( "path/filepath" "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" @@ -31,13 +30,15 @@ import ( // with only one file for data and stat. type OneFileWriter struct { // storage related. - store storage.ExternalStorage - kvStore *KeyValueStore - kvBuffer *membuf.Buffer + store storage.ExternalStorage + kvStore *KeyValueStore + memSizeLimit uint64 // Statistic information per writer. totalSize uint64 - rc *rangePropertiesCollector + curSize uint64 + + rc *rangePropertiesCollector // file information. writerID string @@ -54,16 +55,16 @@ type OneFileWriter struct { } // initWriter inits the underlying dataFile/statFile path, dataWriter/statWriter for OneFileWriter. -func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( +func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64, concurrency int) ( err error, ) { w.dataFile = filepath.Join(w.filenamePrefix, "one-file") - w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{Concurrency: 20, PartSize: partSize}) + w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{Concurrency: concurrency, PartSize: partSize}) if err != nil { return err } w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file") - w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)}) + w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: concurrency, PartSize: int64(5 * size.MB)}) if err != nil { _ = w.dataWriter.Close(ctx) return err @@ -73,9 +74,9 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( } // Init inits the OneFileWriter and its underlying KeyValueStore. -func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) { +func (w *OneFileWriter) Init(ctx context.Context, partSize int64, concurrency int) (err error) { w.logger = logutil.Logger(ctx) - err = w.initWriter(ctx, partSize) + err = w.initWriter(ctx, partSize, concurrency) if err != nil { return err } @@ -85,18 +86,11 @@ func (w *OneFileWriter) Init(ctx context.Context, partSize int64) (err error) { // WriteRow implements ingest.Writer. func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error { - // 1. encode data and write to kvStore. keyLen := len(idxKey) length := len(idxKey) + len(idxVal) + lengthBytes*2 - buf, _ := w.kvBuffer.AllocBytesWithSliceLocation(length) - if buf == nil { - w.kvBuffer.Reset() - buf, _ = w.kvBuffer.AllocBytesWithSliceLocation(length) - // we now don't support KV larger than blockSize - if buf == nil { - return errors.Errorf("failed to allocate kv buffer: %d", length) - } - // 2. write statistics if one kvBuffer is used. + w.curSize += uint64(length) + if w.curSize > w.memSizeLimit { + w.curSize = 0 w.kvStore.Close() encodedStat := w.rc.encode() _, err := w.statWriter.Write(ctx, encodedStat) @@ -105,11 +99,13 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err } w.rc.reset() } - binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) - binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal))) - copy(buf[lengthBytes*2:], idxKey) - copy(buf[lengthBytes*2+keyLen:], idxVal) - err := w.kvStore.addEncodedData(buf[:length]) + data := make([]byte, length) + + binary.BigEndian.AppendUint64(data[:0], uint64(keyLen)) + binary.BigEndian.AppendUint64(data[lengthBytes:lengthBytes], uint64(len(idxVal))) + copy(data[lengthBytes*2:], idxKey) + copy(data[lengthBytes*2+keyLen:], idxVal) + err := w.kvStore.addEncodedData(data[:length]) if err != nil { return err } @@ -126,8 +122,7 @@ func (w *OneFileWriter) Close(ctx context.Context) error { if err != nil { return err } - w.logger.Info("close one file writer", - zap.String("writerID", w.writerID)) + w.logger.Info("close one file writer", zap.String("writerID", w.writerID)) w.totalSize = 0 w.closed = true diff --git a/br/pkg/lightning/backend/external/onefile_writer_test.go b/br/pkg/lightning/backend/external/onefile_writer_test.go index 3999543a82ae7..3e99db6f0d342 100644 --- a/br/pkg/lightning/backend/external/onefile_writer_test.go +++ b/br/pkg/lightning/backend/external/onefile_writer_test.go @@ -29,7 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" - dbkv "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -50,7 +50,7 @@ func TestOnefileWriterBasic(t *testing.T) { SetPropKeysDistance(2). BuildOneFile(memStore, "/test", "0") - err := writer.Init(ctx, 5*1024*1024) + err := writer.Init(ctx, 5*1024*1024, 20) require.NoError(t, err) kvCnt := 100 @@ -124,7 +124,7 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui SetPropKeysDistance(keysDistance). BuildOneFile(memStore, "/"+prefix, "0") - err := writer.Init(ctx, 5*1024*1024) + err := writer.Init(ctx, 5*1024*1024, 20) require.NoError(t, err) kvs := make([]common.KvPair, 0, kvCnt) for i := 0; i < kvCnt; i++ { @@ -177,17 +177,22 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui require.NoError(t, statReader.Close()) } -func TestMergeOverlappingFilesV2(t *testing.T) { +func TestMergeOverlappingFiles(t *testing.T) { // 1. Write to 5 files. // 2. merge 5 files into one file. // 3. read one file and check result. // 4. check duplicate key. ctx := context.Background() memStore := storage.NewMemStorage() + var startKey, endKey kv.Key writer := NewWriterBuilder(). SetPropKeysDistance(2). SetMemorySizeLimit(1000). SetKeyDuplicationEncoding(true). + SetOnCloseFunc(func(summary *WriterSummary) { + startKey = summary.Min + endKey = summary.Max.Next() + }). Build(memStore, "/test", "0") kvCount := 2000000 @@ -197,26 +202,27 @@ func TestMergeOverlappingFilesV2(t *testing.T) { v-- // insert a duplicate key. } key, val := []byte{byte(v)}, []byte{byte(v)} - err := writer.WriteRow(ctx, key, val, dbkv.IntHandle(i)) + err := writer.WriteRow(ctx, key, val, kv.IntHandle(i)) require.NoError(t, err) } err := writer.Close(ctx) require.NoError(t, err) - err = mergeOverlappingFilesV2( + err = MergeOverlappingFiles( ctx, []string{"/test/0/0", "/test/0/1", "/test/0/2", "/test/0/3", "/test/0/4"}, + []string{"/test/0_stat/0", "/test/0_stat/1", "/test/0_stat/2", "/test/0_stat/3", "/test/0_stat/4"}, memStore, + startKey, kv.Key(endKey), int64(5*size.MB), - 100, "/test2", "mergeID", - 1000, - 1000, + DefaultBlockSize, 8*1024, 1*size.MB, - 2, + 8*1024, nil, + 8, true, ) require.NoError(t, err) @@ -276,7 +282,7 @@ func TestOnefileWriterManyRows(t *testing.T) { SetMemorySizeLimit(1000). BuildOneFile(memStore, "/test", "0") - err := writer.Init(ctx, 5*1024*1024) + err := writer.Init(ctx, 5*1024*1024, 20) require.NoError(t, err) kvCnt := 100000 @@ -311,22 +317,25 @@ func TestOnefileWriterManyRows(t *testing.T) { onClose := func(summary *WriterSummary) { resSummary = summary } - err = mergeOverlappingFilesV2( + + err = MergeOverlappingFiles( ctx, []string{"/test/0/one-file"}, + []string{"/test/0_stat/one-file"}, memStore, + kvs[0].Key, kv.Key(kvs[len(kvs)-1].Key).Next(), int64(5*size.MB), - 100, "/test2", "mergeID", - 1000, - 1000, + DefaultBlockSize, 8*1024, 1*size.MB, - 2, + 8*1024, onClose, + 8, true, ) + require.NoError(t, err) bufSize := rand.Intn(100) + 1 diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 334cacb9e135a..d864cb60eadff 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -217,7 +217,6 @@ func (b *WriterBuilder) BuildOneFile( writerID string, ) *OneFileWriter { filenamePrefix := filepath.Join(prefix, writerID) - p := membuf.NewPool(membuf.WithBlockNum(0), membuf.WithBlockSize(b.blockSize)) ret := &OneFileWriter{ rc: &rangePropertiesCollector{ @@ -226,7 +225,6 @@ func (b *WriterBuilder) BuildOneFile( propSizeDist: b.propSizeDist, propKeysDist: b.propKeysDist, }, - kvBuffer: p.NewBuffer(membuf.WithBufferMemoryLimit(b.memSizeLimit)), store: store, filenamePrefix: filenamePrefix, writerID: writerID, diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index 7bc853f63dc1c..b285d0f86ae65 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/jfcg/sorty/v2" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" @@ -155,11 +156,16 @@ func TestWriterFlushMultiFileNames(t *testing.T) { func TestWriterDuplicateDetect(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() + var startKey, endKey dbkv.Key writer := NewWriterBuilder(). SetPropKeysDistance(2). SetMemorySizeLimit(1000). SetKeyDuplicationEncoding(true). + SetOnCloseFunc(func(summary *WriterSummary) { + startKey = summary.Min + endKey = summary.Max.Next() + }). Build(memStore, "/test", "0") kvCount := 20 for i := 0; i < kvCount; i++ { @@ -175,27 +181,29 @@ func TestWriterDuplicateDetect(t *testing.T) { require.NoError(t, err) // test MergeOverlappingFiles will not change duplicate detection functionality. - err = mergeOverlappingFilesImpl( + err = MergeOverlappingFiles( ctx, []string{"/test/0/0"}, + []string{"/test/0_stat/0"}, memStore, - 100, + startKey, dbkv.Key(endKey).Next(), + int64(5*size.MB), "/test2", "mergeID", - 1000, - 1000, + DefaultBlockSize, 8*1024, 1*size.MB, - 2, + 8*1024, nil, - false, + 8, + true, ) require.NoError(t, err) keys := make([][]byte, 0, kvCount) values := make([][]byte, 0, kvCount) - kvReader, err := newKVReader(ctx, "/test2/mergeID/0", memStore, 0, 100) + kvReader, err := newKVReader(ctx, "/test2/mergeID/one-file", memStore, 0, 100) require.NoError(t, err) for i := 0; i < kvCount; i++ { key, value, err := kvReader.nextKV() @@ -269,8 +277,11 @@ func TestWriterMultiFileStat(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() var summary *WriterSummary + var startKey, endKey dbkv.Key closeFn := func(s *WriterSummary) { summary = s + startKey = summary.Min + endKey = summary.Max.Next() } writer := NewWriterBuilder(). @@ -371,62 +382,41 @@ func TestWriterMultiFileStat(t *testing.T) { require.EqualValues(t, "key24", summary.Max) allDataFiles := make([]string, 9) + allStatFiles := make([]string, 9) for i := range allDataFiles { allDataFiles[i] = fmt.Sprintf("/test/0/%d", i) + allStatFiles[i] = fmt.Sprintf("/test/0_stat/%d", i) } - err = mergeOverlappingFilesImpl( + // merge to one file. + err = MergeOverlappingFiles( ctx, allDataFiles, + allStatFiles, memStore, - 100, + startKey, endKey, + int64(5*size.MB), "/test2", "mergeID", - 52, - 52, + DefaultBlockSize, 8*1024, 1*size.MB, - 2, + 8*1024, closeFn, + 8, true, ) require.NoError(t, err) - require.Equal(t, 3, len(summary.MultipleFilesStats)) + require.Equal(t, 1, len(summary.MultipleFilesStats)) expected = MultipleFilesStat{ MinKey: []byte("key01"), - MaxKey: []byte("key06"), - Filenames: [][2]string{ - {"/test2/mergeID/0", "/test2/mergeID_stat/0"}, - {"/test2/mergeID/1", "/test2/mergeID_stat/1"}, - {"/test2/mergeID/2", "/test2/mergeID_stat/2"}, - }, - MaxOverlappingNum: 1, - } - require.Equal(t, expected, summary.MultipleFilesStats[0]) - expected = MultipleFilesStat{ - MinKey: []byte("key11"), - MaxKey: []byte("key16"), - Filenames: [][2]string{ - {"/test2/mergeID/3", "/test2/mergeID_stat/3"}, - {"/test2/mergeID/4", "/test2/mergeID_stat/4"}, - {"/test2/mergeID/5", "/test2/mergeID_stat/5"}, - }, - MaxOverlappingNum: 1, - } - require.Equal(t, expected, summary.MultipleFilesStats[1]) - expected = MultipleFilesStat{ - MinKey: []byte("key20"), MaxKey: []byte("key24"), Filenames: [][2]string{ - {"/test2/mergeID/6", "/test2/mergeID_stat/6"}, - {"/test2/mergeID/7", "/test2/mergeID_stat/7"}, - {"/test2/mergeID/8", "/test2/mergeID_stat/8"}, + {"/test2/mergeID/one-file", "/test2/mergeID_stat/one-file"}, }, MaxOverlappingNum: 1, } - require.Equal(t, expected, summary.MultipleFilesStats[2]) - require.EqualValues(t, "key01", summary.Min) - require.EqualValues(t, "key24", summary.Max) + require.Equal(t, expected, summary.MultipleFilesStats[0]) } func TestWriterSort(t *testing.T) { diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index 8637e73ddacb1..98b1b3a73ba2e 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -98,6 +98,7 @@ func (options *GCSBackendOptions) parseFromFlags(flags *pflag.FlagSet) error { type GCSStorage struct { gcs *backuppb.GCS bucket *storage.BucketHandle + cli *storage.Client } // GetBucketHandle gets the handle to the GCS API on the bucket. @@ -271,7 +272,7 @@ func (s *GCSStorage) URI() string { } // Create implements ExternalStorage interface. -func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { +func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (ExternalFileWriter, error) { object := s.objectName(name) wc := s.bucket.Object(object).NewWriter(ctx) wc.StorageClass = s.gcs.StorageClass @@ -333,7 +334,8 @@ func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage httpClient = &http.Client{Transport: transport} // see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455 var err error - httpClient.Transport, err = htransport.NewTransport(ctx, httpClient.Transport, clientOps...) + httpClient.Transport, err = htransport.NewTransport(ctx, httpClient.Transport, + append(clientOps, option.WithScopes("https://www.googleapis.com/auth/cloud-platform"))...) if err != nil { return nil, errors.Trace(err) } @@ -363,7 +365,7 @@ func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage // so we need find sst in slash directory gcs.Prefix += "//" } - return &GCSStorage{gcs: gcs, bucket: bucket}, nil + return &GCSStorage{gcs: gcs, bucket: bucket, cli: client}, nil } func hasSSTFiles(ctx context.Context, bucket *storage.BucketHandle, prefix string) bool { diff --git a/br/pkg/storage/gcs_extra.go b/br/pkg/storage/gcs_extra.go new file mode 100644 index 0000000000000..b4acd659b2d31 --- /dev/null +++ b/br/pkg/storage/gcs_extra.go @@ -0,0 +1,357 @@ +package storage + +import ( + "bytes" + "context" + "encoding/xml" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "cloud.google.com/go/storage" + "github.com/go-resty/resty/v2" + "github.com/pingcap/log" +) + +const ( + uploadPartTryCnt = 3 // if uploading a part fails, we should try 3 times in total. + MPUInitiateQuery = "uploads" + MPUPartNumberQuery = "partNumber" + MPUUploadIDQuery = "uploadId" + + defaultChunkSize = 5 * 1024 * 1024 // 5 MB + defaultRetry = 3 // 建议默认3~5次,不要太大,否则将可能会导致成为僵尸任务。 + defaultSignedURLExpiry = 15 * time.Minute +) + +type XMLMPU struct { + cli *storage.Client + bucket string + uri string + retry int + signedURLExpiry time.Duration + uploadID string +} + +type InitiateMultipartUploadResult struct { + XMLName xml.Name `xml:"InitiateMultipartUploadResult"` + Text string `xml:",chardata"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadId string `xml:"UploadId"` +} + +// GCSWriter is GCSWriter following GCS multipart upload protocol. +type GCSWriter struct { + ctx context.Context + + objURI string + bucketName string + uploadID string + + partSize int // in bytes + parallelCnt int // number of workers + writtenTotalBytes int64 // the total number of bytes uploaded by workers so far + + workers []*uploadWorker + currentWorker *uploadWorker // the worker who is buffering the data. Once the buffer is full, + // the data will be uploaded as a part. + idleWorkers chan *uploadWorker + m *XMLMPU +} + +func (m *XMLMPU) InitiateXMLMPU() error { + opts := &storage.SignedURLOptions{ + Scheme: storage.SigningSchemeV4, + Method: "POST", + Expires: time.Now().Add(m.signedURLExpiry), + QueryParameters: url.Values{MPUInitiateQuery: []string{""}}, + } + u, err := m.cli.Bucket(m.bucket).SignedURL(m.uri, opts) + if err != nil { + return fmt.Errorf("Bucket(%q).SignedURL: %s", m.bucket, err) + } + + client := resty.New() + resp, err := client.R().Post(u) + if err != nil { + return fmt.Errorf("POST request failed: %s", err) + } + + if resp.StatusCode() != http.StatusOK { + return fmt.Errorf("POST request returned non-OK status: %d", resp.StatusCode()) + } + body := resp.Body() + + result := InitiateMultipartUploadResult{} + err = xml.Unmarshal(body, &result) + if err != nil { + return fmt.Errorf("failed to unmarshal response body: %s", err) + } + + uploadID := result.UploadId + m.uploadID = uploadID + return nil +} + +// NewGCSWriter returns a GCSWriter which uses GCS multipart upload API behind the scene. +func NewGCSWriter(ctx context.Context, cli *storage.Client, uri string, partSize int, parallelCnt int, bucketName string) (*GCSWriter, error) { + // multipart upload protocol: go/gcs-pb-multipart-upload#performing-a-multipart-upload + m := XMLMPU{ + cli: cli, + retry: defaultRetry, + uri: uri, + signedURLExpiry: defaultSignedURLExpiry, + bucket: bucketName, + } + err := m.InitiateXMLMPU() + if err != nil { + err = fmt.Errorf("Failed to initiate XMLMPU: %s", err) + return nil, err + } + + w := &GCSWriter{ + ctx: ctx, + objURI: uri, + bucketName: bucketName, + partSize: partSize, + parallelCnt: parallelCnt, + workers: make([]*uploadWorker, parallelCnt), + currentWorker: nil, + idleWorkers: make(chan *uploadWorker, parallelCnt), + m: &m, + } + + for i := 0; i < parallelCnt; i++ { + w.workers[i] = newUploadWorker(cli, uri, bucketName, m.uploadID, i, partSize, w.idleWorkers) + w.idleWorkers <- w.workers[i] + } + + return w, nil +} + +// Write transfer data to GCS by using multipart upload API. +func (w *GCSWriter) Write(p []byte) (n int, err error) { + i := 0 + for i < len(p) { + if w.currentWorker == nil { + // pick a worker from the idle pool + w.currentWorker = <-w.idleWorkers + } + + n := w.currentWorker.bufferData(w.partNum(), p[i:]) // the part number doesn't change while buffering + i += n + w.writtenTotalBytes += int64(n) + + if w.currentWorker.full() { + err := w.currentWorker.uploadPartAsync() + if err != nil { + // The TTL for a uploaded but not completed part is 7 days: https://screenshot.googleplex.com/A6wGvcx5hYRhUd7 + return i, err + } + w.currentWorker = nil // this worker is uploading a part, will find another idle worker. + } + } + return i, nil +} + +// Close waits for the completion of all transfer and generate the final GCS object. +func (w *GCSWriter) Close() error { + if w.currentWorker != nil { + // uploads the last part, which is not a full part + w.currentWorker.uploadPartAsync() + } + + for i := 0; i < w.parallelCnt; i++ { + <-w.idleWorkers + } + + // merge all parts + parts := make(map[int]objectPart) + for i := 0; i < w.parallelCnt; i++ { + for k, v := range w.workers[i].parts { + parts[k] = v + } + // log.Info(fmt.Sprintf("worker %v uploaded %v parts for %v", i, len(w.workers[i].parts), w.objURI)) + } + + return w.completeMultipartUpload(parts) +} + +func (w *GCSWriter) partNum() int { + return int((w.writtenTotalBytes / int64(w.partSize)) + 1) // Starts from 1, instead of 0 +} + +func (w *GCSWriter) completeMultipartUpload(parts map[int]objectPart) error { + bodyXML, err := buildCompleteMultipartUploadXML(parts) + if err != nil { + return fmt.Errorf("failed to build complete multipart upload XML for %v, err: %w", w.objURI, err) + } + + values := url.Values{} + values.Add(MPUUploadIDQuery, w.m.uploadID) + opts := &storage.SignedURLOptions{ + Scheme: storage.SigningSchemeV4, + Method: "POST", + Expires: time.Now().Add(defaultSignedURLExpiry), + //ContentType: "application/xml", + QueryParameters: values, + } + u, err := w.m.cli.Bucket(w.m.bucket).SignedURL(w.objURI, opts) + if err != nil { + err = fmt.Errorf("Bucket(%q).SignedURL: %s", w.m.bucket, err) + return err + } + + client := resty.New() + resp, err := client.R().SetBody(bodyXML).Post(u) + if err != nil { + err = fmt.Errorf("POST request failed: %s", err) + return err + } + + if resp.StatusCode() != http.StatusOK { + err = fmt.Errorf("POST request returned non-OK status: %d", resp.StatusCode()) + return err + } + + return nil +} + +func buildCompleteMultipartUploadXML(parts map[int]objectPart) (string, error) { + xmlStr := strings.Builder{} + encoder := xml.NewEncoder(&xmlStr) + encoder.Indent("", " ") // Indent with 2 spaces. + + upload := struct { + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []objectPart `xml:"Part"` + }{} + // Order parts in the XML request. + upload.Parts = make([]objectPart, 0, len(parts)) + for partNum := 1; partNum <= len(parts); partNum++ { + part, ok := parts[partNum] + if ok { + upload.Parts = append(upload.Parts, part) + } else { + return "", fmt.Errorf("part %v not contained in parts", partNum) + } + } + + if err := encoder.Encode(upload); err != nil { + return "", err + } + return xmlStr.String(), nil +} + +type objectPart struct { + PartNumber int `xml:"PartNumber"` + ETag string `xml:"ETag"` +} + +type uploadWorker struct { + cli *storage.Client + uri string + index int // the index of the worker + + bucketName, uploadID string + + buffer []byte // buffer capacity is equal to partSize + offset int + partSize int // 5 MiB <= partSize <= 5 GiB + currentPartNumber int + parts map[int]objectPart + + idleWorkerPool chan *uploadWorker + + err error // the error got from the last upload, will retry this part + tryCnt int +} + +func newUploadWorker(cli *storage.Client, uri string, bucketName, uploadID string, index, partSize int, idleWorkerPool chan *uploadWorker) *uploadWorker { + return &uploadWorker{ + uri: uri, + cli: cli, + index: index, + bucketName: bucketName, + uploadID: uploadID, + buffer: make([]byte, partSize), + partSize: partSize, + parts: make(map[int]objectPart), + idleWorkerPool: idleWorkerPool, + } +} + +func (uw *uploadWorker) bufferData(partNumber int, p []byte) int { + uw.currentPartNumber = partNumber + n := copy(uw.buffer[uw.offset:], p) + uw.offset += n + return n +} + +func (uw *uploadWorker) full() bool { + return uw.offset >= uw.partSize // if true, this worker is ready to upload the full part +} + +func (uw *uploadWorker) uploadPartAsync() error { + if uw.err != nil && uw.tryCnt >= uploadPartTryCnt { + return fmt.Errorf("failed to upload part %v too many times, err: %w", uw.index, uw.err) + } + go func() { + err := uw.uploadPart(uw.currentPartNumber, uw.buffer[:uw.offset]) // timeout in few seconds + if err != nil { + uw.err = err + uw.tryCnt++ + log.Error(fmt.Sprintf("upload worker %v failed to upload part %v for object %v, tryCnt: %v, err: %v", uw.index, uw.currentPartNumber, uw.uri, uw.tryCnt, err)) + } else { + // Upload succeeded. Reset + uw.offset = 0 + uw.err = nil + uw.currentPartNumber = -1 + uw.tryCnt = 0 + } + uw.idleWorkerPool <- uw + }() + return nil +} + +// UploadPart upload a part. +func (uw *uploadWorker) uploadPart(partNumber int, data []byte) error { + if partNumber < 1 { + return fmt.Errorf("Invalid partNumber: %v", partNumber) + } + + opts := &storage.SignedURLOptions{ + Scheme: storage.SigningSchemeV4, + Method: "PUT", + Expires: time.Now().Add(defaultSignedURLExpiry), + QueryParameters: url.Values{ + MPUUploadIDQuery: []string{uw.uploadID}, + MPUPartNumberQuery: []string{strconv.Itoa(partNumber)}, + }, + } + + u, err := uw.cli.Bucket(uw.bucketName).SignedURL(uw.uri, opts) + if err != nil { + return fmt.Errorf("Bucket(%q).SignedURL: %s", uw.bucketName, err) + } + + client := resty.New() + resp, err := client.R().SetBody(bytes.NewReader(data)).Put(u) // Set payload as request body + if err != nil { + return fmt.Errorf("PUT request failed: %s", err) + } + + etag := resp.Header().Get("ETag") + + if resp.StatusCode() != http.StatusOK { + return fmt.Errorf("PUT request returned non-OK status: %d", resp.StatusCode()) + } + + uw.parts[partNumber] = objectPart{PartNumber: partNumber, ETag: etag} + return nil +} diff --git a/go.mod b/go.mod index 6d691268d9b95..e9bfd3e4a4a73 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/docker/go-units v0.5.0 github.com/dolthub/swiss v0.2.1 github.com/emirpasic/gods v1.18.1 + github.com/go-resty/resty/v2 v2.7.0 github.com/fatanugraha/noloopclosure v0.1.1 github.com/fatih/color v1.15.0 github.com/fsouza/fake-gcs-server v1.44.0 diff --git a/go.sum b/go.sum index 140ab0e9d35bf..a0d6b2ac713c3 100644 --- a/go.sum +++ b/go.sum @@ -295,6 +295,8 @@ github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AE github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= +github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= @@ -1091,6 +1093,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220517181318-183a9ca12b87/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/ddl/backfilling_dispatcher.go b/pkg/ddl/backfilling_dispatcher.go index 092d305158653..7222d6a6c5f49 100644 --- a/pkg/ddl/backfilling_dispatcher.go +++ b/pkg/ddl/backfilling_dispatcher.go @@ -169,10 +169,7 @@ func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step { } func skipMergeSort(stats []external.MultipleFilesStat) bool { - failpoint.Inject("forceMergeSort", func() { - failpoint.Return(false) - }) - return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold + return false } // OnDone implements dispatcher.Extension interface. @@ -349,6 +346,8 @@ func generateGlobalSortIngestPlan( if err != nil { return nil, err } + logutil.BgLogger().Info("ywq test from last step", zap.Binary("startkey", startKeyFromSumm), zap.Binary("endkey", endKeyFromSumm), + zap.Any("startkey", startKeyFromSumm), zap.Any("endkey", endKeyFromSumm)) instanceIDs, err := dispatcher.GenerateTaskExecutorNodes(ctx) if err != nil { return nil, err @@ -379,8 +378,8 @@ func generateGlobalSortIngestPlan( endKey = kv.Key(endKeyOfGroup).Clone() } logger.Info("split subtask range", - zap.String("startKey", hex.EncodeToString(startKey)), - zap.String("endKey", hex.EncodeToString(endKey))) + zap.Binary("startKey", (startKey)), + zap.Binary("endKey", (endKey))) if startKey.Cmp(endKey) >= 0 { return nil, errors.Errorf("invalid range, startKey: %s, endKey: %s", @@ -420,6 +419,8 @@ func generateMergePlan( return nil, err } multiStats := make([]external.MultipleFilesStat, 0, 100) + dataFiles := make([]string, 0, 100) + statFiles := make([]string, 0, 100) for _, bs := range subTaskMetas { var subtask BackfillSubTaskMeta err = json.Unmarshal(bs, &subtask) @@ -427,21 +428,38 @@ func generateMergePlan( return nil, err } multiStats = append(multiStats, subtask.MultipleFilesStats...) + for _, stat := range subtask.MultipleFilesStats { + for _, files := range stat.Filenames { + dataFiles = append(dataFiles, files[0]) + statFiles = append(statFiles, files[1]) + } + } } if skipMergeSort(multiStats) { logger.Info("skip merge sort") return nil, nil } - // generate merge sort plan. - _, _, _, dataFiles, _, err := getSummaryFromLastStep(taskHandle, task.ID, StepReadIndex) - if err != nil { - return nil, err + startKeys := make([]kv.Key, 0, 10) + endKeys := make([]kv.Key, 0, 10) + + i := 0 + for ; i < len(multiStats)-1; i += 2 { + startKey := external.BytesMin(multiStats[i].MinKey, multiStats[i+1].MinKey) + endKey := external.BytesMax(multiStats[i].MaxKey, multiStats[i+1].MaxKey) + endKey = kv.Key(endKey).Next() + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) + } + if i == len(multiStats)-1 { + startKeys = append(startKeys, multiStats[i].MinKey) + endKeys = append(endKeys, kv.Key(multiStats[i].MaxKey).Next()) } start := 0 step := external.MergeSortFileCountStep metaArr := make([][]byte, 0, 16) + i = 0 for start < len(dataFiles) { end := start + step if end > len(dataFiles) { @@ -449,6 +467,11 @@ func generateMergePlan( } m := &BackfillSubTaskMeta{ DataFiles: dataFiles[start:end], + StatFiles: statFiles[start:end], + SortedKVMeta: external.SortedKVMeta{ + StartKey: startKeys[i], + EndKey: endKeys[i], + }, } metaBytes, err := json.Marshal(m) if err != nil { @@ -457,6 +480,7 @@ func generateMergePlan( metaArr = append(metaArr, metaBytes) start = end + i++ } return metaArr, nil } diff --git a/pkg/ddl/backfilling_import_cloud.go b/pkg/ddl/backfilling_import_cloud.go index b5f0e70749ec7..0baf5f45b283d 100644 --- a/pkg/ddl/backfilling_import_cloud.go +++ b/pkg/ddl/backfilling_import_cloud.go @@ -78,6 +78,11 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub return errors.Errorf("local backend not found") } _, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, m.index.ID) + logutil.BgLogger().Info("ywq test", + zap.Any("datafiles", sm.DataFiles), + zap.Any("statfiles", sm.StatFiles), + zap.Any("startkey", sm.StartKey), + zap.Any("endkey", sm.EndKey)) err = local.CloseEngine(ctx, &backend.EngineConfig{ External: &backend.ExternalEngineConfig{ StorageURI: m.cloudStoreURI, diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 6de8ee0d7b756..9e22aab5708f0 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -21,6 +21,7 @@ import ( "strconv" "sync" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/storage" @@ -104,19 +105,23 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta return err } - return external.MergeOverlappingFiles( + return external.MergeOverlappingFilesOpt( ctx, sm.DataFiles, + sm.StatFiles, store, + sm.StartKey, + sm.EndKey, int64(partSize), - 64*1024, prefix, + uuid.New().String(), external.DefaultBlockSize, 8*1024, 1*size.MB, 8*1024, onClose, - int(variable.GetDDLReorgWorkerCounter()), true) + int(variable.GetDDLReorgWorkerCounter()/2), + true) } func (*mergeSortExecutor) Cleanup(ctx context.Context) error { diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index e5b53c0a369eb..d0e78bfa061f6 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -127,7 +127,6 @@ func getWriterMemSize(idxNum int) (uint64, error) { } memAvailable := memTotal - memUsed memSize := (memAvailable / 2) / uint64(writerCnt) / uint64(idxNum) - logutil.BgLogger().Info("build operators that write index to cloud storage", zap.Uint64("memory total", memTotal), zap.Uint64("memory used", memUsed), zap.Uint64("memory size", memSize)) return memSize, nil } diff --git a/pkg/ddl/dist_owner.go b/pkg/ddl/dist_owner.go index 8e0e80514f1cd..90619210d22d2 100644 --- a/pkg/ddl/dist_owner.go +++ b/pkg/ddl/dist_owner.go @@ -26,5 +26,5 @@ var ( ) const ( - distPhysicalTableConcurrency = 16 + distPhysicalTableConcurrency = 2 ) diff --git a/pkg/disttask/importinto/planner.go b/pkg/disttask/importinto/planner.go index 53ffe2e8c3665..c7348bd2db06d 100644 --- a/pkg/disttask/importinto/planner.go +++ b/pkg/disttask/importinto/planner.go @@ -285,13 +285,14 @@ func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.Pipeli } func skipMergeSort(kvGroup string, stats []external.MultipleFilesStat) bool { - failpoint.Inject("forceMergeSort", func(val failpoint.Value) { - in := val.(string) - if in == kvGroup || in == "*" { - failpoint.Return(false) - } - }) - return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold + return false + // failpoint.Inject("forceMergeSort", func(val failpoint.Value) { + // in := val.(string) + // if in == kvGroup || in == "*" { + // failpoint.Return(false) + // } + // }) + // return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold } func generateMergeSortSpecs(planCtx planner.PlanCtx) ([]planner.PipelineSpec, error) { @@ -309,7 +310,26 @@ func generateMergeSortSpecs(planCtx planner.PlanCtx) ([]planner.PipelineSpec, er continue } dataFiles := kvMeta.GetDataFiles() + statFiles := kvMeta.GetStatFiles() + + startKeys := make([]tidbkv.Key, 0, 10) + endKeys := make([]tidbkv.Key, 0, 10) + + i := 0 + for ; i < len(kvMeta.MultipleFilesStats)-1; i += 2 { + startKey := external.BytesMin(kvMeta.MultipleFilesStats[i].MinKey, kvMeta.MultipleFilesStats[i+1].MinKey) + endKey := external.BytesMax(kvMeta.MultipleFilesStats[i].MaxKey, kvMeta.MultipleFilesStats[i+1].MaxKey) + endKey = tidbkv.Key(endKey).Next() + startKeys = append(startKeys, startKey) + endKeys = append(endKeys, endKey) + } + if i == len(kvMeta.MultipleFilesStats)-1 { + startKeys = append(startKeys, kvMeta.MultipleFilesStats[i].MinKey) + endKeys = append(endKeys, kvMeta.MultipleFilesStats[i].MaxKey.Next()) + } + length := len(dataFiles) + i = 0 for start := 0; start < length; start += step { end := start + step if end > length { @@ -319,8 +339,14 @@ func generateMergeSortSpecs(planCtx planner.PlanCtx) ([]planner.PipelineSpec, er MergeSortStepMeta: &MergeSortStepMeta{ KVGroup: kvGroup, DataFiles: dataFiles[start:end], + StatFiles: statFiles[start:end], + SortedKVMeta: external.SortedKVMeta{ + StartKey: startKeys[i], + EndKey: endKeys[i], + }, }, }) + i++ } } return result, nil diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index 158d507be9713..064808b4beb26 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -99,8 +99,10 @@ const ( // MergeSortStepMeta is the meta of merge sort step. type MergeSortStepMeta struct { // KVGroup is the group name of the sorted kv, either dataKVGroup or index-id. - KVGroup string `json:"kv-group"` - DataFiles []string `json:"data-files"` + KVGroup string `json:"kv-group"` + DataFiles []string `json:"data-files"` + StatFiles []string `json:"stat-files"` + external.SortedKVMeta `json:"sorted-kv-meta"` } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index de419a8fe540f..151261fcae265 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -21,6 +21,7 @@ import ( "time" "github.com/docker/go-units" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -310,9 +311,24 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize)))) - return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, m.partSize, 64*1024, - prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024, - onClose, int(m.taskMeta.Plan.ThreadCnt), false) + return external.MergeOverlappingFilesOpt( + ctx, + sm.DataFiles, + sm.StatFiles, + m.controller.GlobalSortStore, + sm.StartKey, + sm.EndKey, + m.partSize, + prefix, + uuid.New().String(), + getKVGroupBlockSize(sm.KVGroup), + 8*1024, + 1*size.MB, + 8*1024, + onClose, + int(m.taskMeta.Plan.ThreadCnt), + true, + ) } func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {