Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM]global sort: new merge step #49212

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 214 additions & 6 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -360,7 +361,7 @@ func writeExternalOneFile(s *writeTestSuite) {
}
writer := builder.BuildOneFile(
s.store, filePath, "writerID")
_ = writer.Init(ctx, 20*1024*1024)
_ = writer.Init(ctx, 20*1024*1024, 20)
key, val, _ := s.source.next()
for key != nil {
err := writer.WriteRow(ctx, key, val)
Expand Down Expand Up @@ -537,7 +538,8 @@ func readFileConcurrently(t *testing.T, s *readTestSuite) {
file := files[i]
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
st, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil)
reader, err := st.Open(ctx, file, nil)
intest.AssertNoError(err)
var size int
for {
Expand Down Expand Up @@ -721,14 +723,15 @@ func createAscendingFiles(
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) int {
) (int, kv.Key, kv.Key) {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)

keyIdx := 0
value := make([]byte, 100)
kvCnt := 0
var minKey, maxKey kv.Key
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
Expand All @@ -739,18 +742,26 @@ func createAscendingFiles(
)

totalSize := 0

var key string
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
key = fmt.Sprintf("key_%09d", keyIdx)
if i == 0 && totalSize == 0 {
minKey = []byte(key)
}
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.AssertNoError(err)
keyIdx++
totalSize += len(key) + len(value)
kvCnt++
}
if i == fileCount-1 {
maxKey = []byte(key)
}
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return kvCnt
return kvCnt, minKey, maxKey
}

var (
Expand Down Expand Up @@ -785,7 +796,7 @@ func testCompareReaderAscendingContent(t *testing.T, fn func(t *testing.T, suite
store := openTestingStorage(t)
kvCnt := 0
if !*skipCreate {
kvCnt = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix)
kvCnt, _, _ = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix)
}
fileIdx := 0
var (
Expand Down Expand Up @@ -896,3 +907,200 @@ func TestPrepareLargeData(t *testing.T) {
t.Logf("total %d data files, first file size: %.2f MB, last file size: %.2f MB",
len(dataFiles), float64(firstFileSize)/1024/1024, float64(lastFileSize)/1024/1024)
}

type mergeTestSuite struct {
store storage.ExternalStorage
subDir string
totalKVCnt int
concurrency int
memoryLimit int
mergeIterHotspot bool
minKey kv.Key
maxKey kv.Key
beforeMerge func()
afterMerge func()
}

func newMergeStep(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
if s.beforeMerge != nil {
s.beforeMerge()
}

now := time.Now()
err = MergeOverlappingFiles(
ctx,
datas,
stats,
s.store,
s.minKey, kv.Key(s.maxKey).Next(),
int64(5*size.MB),
mergeOutput,
"mergeID",
DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
)

intest.AssertNoError(err)
if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf(
"merge speed for %d bytes in %s, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
)
}

func newMergeStepOpt(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
if s.beforeMerge != nil {
s.beforeMerge()
}

now := time.Now()
err = MergeOverlappingFilesOpt(
ctx,
datas,
stats,
s.store,
s.minKey, kv.Key(s.maxKey).Next(),
int64(5*size.MB),
mergeOutput,
"mergeID",
DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
)

intest.AssertNoError(err)
if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf(
"merge speed with %d concurrency for %d bytes in %s, speed: %.2f MB/s",
s.concurrency,
totalSize.Load(),
elapsed,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
)
}

func oldMergeStep(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

if s.beforeMerge != nil {
s.beforeMerge()
}
mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
now := time.Now()
err = MergeOverlappingFilesV2(
ctx,
datas,
s.store,
16*1024*1024,
64*1024,
mergeOutput,
DefaultBlockSize,
8*1024,
1024*1024,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot)
intest.AssertNoError(err)

if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf("merge prev implementation with %d concurrency %d bytes in %s, speed: %.2f MB/s",
s.concurrency, totalSize.Load(), elapsed, float64(totalSize.Load())/elapsed.Seconds()/1024/1024)
}

func testCompareMergeAscendingContent(t *testing.T, fn func(t *testing.T, suite *mergeTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
var minKey, maxKey kv.Key
if !*skipCreate {
kvCnt, minKey, maxKey = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix)
}

fileIdx := 0
var (
file *os.File
err error
)
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.AssertNoError(err)
err = pprof.StartCPUProfile(file)
intest.AssertNoError(err)
}

afterTest := func() {
pprof.StopCPUProfile()
}

suite := &mergeTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: *concurrency,
memoryLimit: *memoryLimit,
beforeMerge: beforeTest,
afterMerge: afterTest,
subDir: *objectPrefix,
minKey: minKey,
maxKey: maxKey,
mergeIterHotspot: true,
}

fn(t, suite)
}

func TestNewMerge(t *testing.T) {
testCompareMergeAscendingContent(t, newMergeStep)
}

func TestOldMerge(t *testing.T) {
testCompareMergeAscendingContent(t, oldMergeStep)
}

func TestNewMergeOpt(t *testing.T) {
testCompareMergeAscendingContent(t, newMergeStepOpt)
}
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (r *byteReader) reload() error {
now := r.concurrentReader.now
// in read only false -> true is possible
if !now && to {
r.logger.Info("switch reader mode", zap.Bool("use concurrent mode", true))
// r.logger.Info("switch reader mode", zap.Bool("use concurrent mode", true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to debug?

err := r.switchToConcurrentReader()
if err != nil {
return err
Expand Down Expand Up @@ -311,10 +311,10 @@ func (r *byteReader) reload() error {
}

func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int) {
r.logger.Info("drop data in closeConcurrentReader",
zap.Int("reloadCnt", r.concurrentReader.reloadCnt),
zap.Int("dropBytes", len(r.curBuf)-r.curBufOffset),
)
// r.logger.Info("drop data in closeConcurrentReader",
// zap.Int("reloadCnt", r.concurrentReader.reloadCnt),
// zap.Int("dropBytes", len(r.curBuf)-r.curBufOffset),
// )
r.concurrentReader.largeBufferPool.Destroy()
r.concurrentReader.largeBuf = nil
r.concurrentReader.now = false
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,5 +798,4 @@ func (m *MemoryIngestData) DecRef() {
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
m.importedKVSize.Add(totalBytes)
m.importedKVCount.Add(totalCount)

}
Loading