Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: in-txn statement read from MemBuffer's snapshot #59219

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7037,13 +7037,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "e59695d9b1dd460126e9c7dbc60ab44224e56d6a9c6fea04c88127fb0b82694c",
strip_prefix = "github.com/tikv/client-go/[email protected].8-0.20250117034919-61e09c6539bd",
sha256 = "c50f7420eaf607f4b6f616895dac07d26ebb1a17f95e2ef1f500079c426582a4",
strip_prefix = "github.com/you06/client-go/[email protected].0-alpha.0.20250127074828-f9e16c235697",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250117034919-61e09c6539bd.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250117034919-61e09c6539bd.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250117034919-61e09c6539bd.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250117034919-61e09c6539bd.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/you06/client-go/v2/com_github_you06_client_go_v2-v2.0.0-alpha.0.20250127074828-f9e16c235697.zip",
"http://ats.apps.svc/gomod/github.com/you06/client-go/v2/com_github_you06_client_go_v2-v2.0.0-alpha.0.20250127074828-f9e16c235697.zip",
"https://cache.hawkingrei.com/gomod/github.com/you06/client-go/v2/com_github_you06_client_go_v2-v2.0.0-alpha.0.20250127074828-f9e16c235697.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/you06/client-go/v2/com_github_you06_client_go_v2-v2.0.0-alpha.0.20250127074828-f9e16c235697.zip",
],
)
go_repository(
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,5 @@ replace (
sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)

replace github.com/tikv/client-go/v2 => github.com/you06/client-go/v2 v2.0.0-alpha.0.20250127074828-f9e16c235697
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,6 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20250117034919-61e09c6539bd h1:87RbBpXCsDIecDTIyXLF1kQR+WKMS6lL5lFCqyEaWEs=
github.com/tikv/client-go/v2 v2.0.8-0.20250117034919-61e09c6539bd/go.mod h1:lpSPdaBRBj3/1odUCZ94xq2TB9rMwbpx7HpX4HAyHck=
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de h1:fefo6U56UMca1CfwY/FVhfVcinyDgXgffQQp9lfMeLg=
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de/go.mod h1:6xcCIcECSowarZ9+AqdURngHzS+t5w2x3qhwg1moj4o=
github.com/timakin/bodyclose v0.0.0-20241017074812-ed6a65f985e3 h1:y4mJRFlM6fUyPhoXuFg/Yu02fg/nIPFMOY8tOqppoFg=
Expand Down Expand Up @@ -864,6 +862,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k=
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/you06/client-go/v2 v2.0.0-alpha.0.20250127074828-f9e16c235697 h1:BwFRvCyZDtqRuVkzNJzxX+cNB9kCLj1rhTNMUcinZpE=
github.com/you06/client-go/v2 v2.0.0-alpha.0.20250127074828-f9e16c235697/go.mod h1:lpSPdaBRBj3/1odUCZ94xq2TB9rMwbpx7HpX4HAyHck=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ func (e *BatchPointGetExec) Open(context.Context) error {
var batchGetter kv.BatchGetter = e.snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
snapshot := e.Ctx().GetSessionVars().TxnCtx.MemBufferSnapshot
if e.lock {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, e.snapshot)
batchGetter = driver.NewBufferSnapshotBatchGetter(snapshot, &PessimisticLockCacheGetter{txnCtx: txnCtx}, e.snapshot)
} else if lock != nil && (lock.Tp == ast.TableLockRead || lock.Tp == ast.TableLockReadOnly) && e.Ctx().GetSessionVars().EnablePointGetCache {
batchGetter = newCacheBatchGetter(e.Ctx(), e.tblInfo.ID, e.snapshot)
} else {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, e.snapshot)
} else if snapshot != nil {
batchGetter = driver.NewBufferSnapshotBatchGetter(snapshot, nil, e.snapshot)
}
}
e.batchGetter = batchGetter
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func (iter *memRowsIterForIndex) Next() ([]types.Datum, error) {
continue
}

// filter key/value by partitition id
// filter key/value by partition id
if iter.index.Global {
_, pid, err := codec.DecodeInt(tablecodec.SplitIndexValue(value).PartitionID)
if err != nil {
Expand Down
21 changes: 11 additions & 10 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,25 +645,26 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
err error
)

if e.txn.Valid() && !e.txn.IsReadOnly() {
if snapshot := e.Ctx().GetSessionVars().TxnCtx.MemBufferSnapshot; snapshot != nil {
// We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be
// different for pessimistic transaction.
val, err = e.txn.GetMemBuffer().Get(ctx, key)
val, err = snapshot.Get(ctx, key)
if err == nil {
return val, err
}
if !kv.IsErrNotFound(err) {
return nil, err
}
// key does not exist in mem buffer, check the lock cache
if e.lock {
var ok bool
val, ok = e.Ctx().GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key)
if ok {
return val, nil
}
// fallthrough to pessimistic lock cache & snapshot get.
}

// key does not exist in mem buffer, check the lock cache
if e.lock {
var ok bool
val, ok = e.Ctx().GetSessionVars().TxnCtx.GetKeyInPessimisticLockCache(key)
if ok {
return val, nil
}
// fallthrough to snapshot get.
}

lock := e.tblInfo.Lock
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ type MemBuffer interface {

// BatchGet gets values from the memory buffer.
BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error)

// GetSnapshot returns a snapshot of the MemBuffer, used in in-txn read.
GetSnapshot() MemBufferSnapshot
}

// FindKeysInStage returns all keys in the given stage that satisfies the given condition.
Expand Down Expand Up @@ -851,3 +854,6 @@ func decodeTableID(key Key) int64 {
}
return 0
}

// MemBufferSnapshot is a snapshot of MemBuffer, used for in-txn read.
type MemBufferSnapshot = tikv.MemBufferSnapshot
2 changes: 1 addition & 1 deletion pkg/session/test/txn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//pkg/config",
"//pkg/kv",
Expand Down
47 changes: 47 additions & 0 deletions pkg/session/test/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,50 @@ func TestMemBufferCleanupMemoryLeak(t *testing.T) {
}
tk.MustExec("commit")
}

func TestMemDBRaceInUnionExec(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tables := 5
for i := 0; i < tables; i++ {
tk.MustExec(fmt.Sprintf("create table t%d(id int primary key, v int)", i))
}

tk.MustExec("insert into t0 values(1, 1), (2, 2), (3, 3), (4, 4)")
for i := 1; i < tables; i++ {
tk.MustExec(fmt.Sprintf("insert into t%d select * from t0", i))
}

tk.MustExec("set tidb_pessimistic_txn_fair_locking=0")

// point get
for i := 0; i < 1001; i++ {
tk.MustExec("begin pessimistic")
dirty := i%2 == 0
if dirty {
tk.MustExec("insert into t0 values(5, 5)")
}
tk.MustQuery(`select * from t0 where id = 1 for update union
select * from t1 where id = 1 for update union
select * from t2 where id = 1 for update union
select * from t3 where id = 1 for update union
select * from t4 where id = 1 for update`)
tk.MustExec("rollback")
}

// batch point get
for i := 0; i < 1001; i++ {
tk.MustExec("begin pessimistic")
dirty := i%2 == 0
if dirty {
tk.MustExec("insert into t0 values(5, 5)")
}
tk.MustQuery(`select * from t0 where id in (1, 2, 3) for update union
select * from t1 where id in (1, 2, 3) for update union
select * from t2 where id in (1, 2, 3) for update union
select * from t3 where id in (1, 2, 3) for update union
select * from t4 where id in (1, 2, 3) for update`)
tk.MustExec("rollback")
}
}
4 changes: 4 additions & 0 deletions pkg/session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St
failpoint.Return(errors.New("occur an error after finishStmt"))
})
sessVars := se.sessionVars
if sessVars.TxnCtx.MemBufferSnapshot != nil {
sessVars.TxnCtx.MemBufferSnapshot.Close()
sessVars.TxnCtx.MemBufferSnapshot = nil
}
if !sql.IsReadOnly(sessVars) {
// All the history should be added here.
if meetsErr == nil && sessVars.TxnCtx.CouldRetry {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ type TxnCtxNoNeedToRestore struct {
// Read results cannot be directly written into pessimisticLockCache because failed statement need to rollback
// its pessimistic locks.
CurrentStmtPessimisticLockCache map[string][]byte

// MemBufferSnapshot is a snapshot of the memory buffer, every in-txn read of the buffer should use the snapshot to bypass the mutations of current statement.
MemBufferSnapshot kv.MemBufferSnapshot
}

// SavepointRecord indicates a transaction's savepoint record.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
// OnStmtStart is the hook that should be called when a new statement started
func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context, _ ast.StmtNode) error {
p.ctx = ctx
txnCtx := p.sctx.GetSessionVars().TxnCtx
if p.txn != nil && !p.txn.IsReadOnly() {
txnCtx.MemBufferSnapshot = p.txn.GetMemBuffer().GetSnapshot()
}
return nil
}

Expand Down
82 changes: 82 additions & 0 deletions pkg/store/driver/txn/batch_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,85 @@ func (b *BufferBatchGetter) BatchGet(ctx context.Context, keys []kv.Key) (map[st

return storageValues, err
}

// tikvBatchBufferGetter is the BatchBufferGetter struct for tikv
// In order to directly call NewBufferBatchGetter in client-go
// We need to implement the interface (transaction.BatchBufferGetter) in client-go for tikvBatchBufferGetter
type tikvBatchBufferSnapshotGetter struct {
tidbMiddleCache Getter
tidbSnapshotBuffer kv.MemBufferSnapshot
}

func (b tikvBatchBufferSnapshotGetter) Get(ctx context.Context, k []byte) ([]byte, error) {
// Get from buffer
val, err := b.tidbSnapshotBuffer.Get(ctx, k)
if err == nil || !kv.IsErrNotFound(err) || b.tidbMiddleCache == nil {
if kv.IsErrNotFound(err) {
err = tikverr.ErrNotExist
}
return val, err
}
// Get from middle cache
val, err = b.tidbMiddleCache.Get(ctx, k)
if err == nil {
return val, err
}
// TiDB err NotExist to TiKV err NotExist
// The BatchGet method in client-go will call this method
// Therefore, the error needs to convert to TiKV's type, otherwise the error will not be handled properly in client-go
err = tikverr.ErrNotExist
return val, err
}

func (b tikvBatchBufferSnapshotGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
bufferValues := make(map[string][]byte, len(keys))
if b.tidbSnapshotBuffer != nil {
for _, key := range keys {
val, err := b.tidbSnapshotBuffer.Get(ctx, key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return nil, err
}
bufferValues[string(key)] = val
}
}

if b.tidbMiddleCache == nil {
return bufferValues, nil
}
for _, key := range keys {
if _, ok := bufferValues[string(key)]; !ok {
val, err := b.tidbMiddleCache.Get(ctx, key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return nil, err
}
bufferValues[string(key)] = val
}
}
return bufferValues, nil
}

// NewBufferSnapshotBatchGetter creates a new BufferBatchGetter.
func NewBufferSnapshotBatchGetter(bufferSnapshot kv.MemBufferSnapshot, middleCache Getter, snapshot BatchGetter) *BufferSnapshotBatchGetter {
tikvBuffer := tikvBatchBufferSnapshotGetter{tidbMiddleCache: middleCache, tidbSnapshotBuffer: bufferSnapshot}
tikvSnapshot := tikvBatchGetter{snapshot}
return &BufferSnapshotBatchGetter{tikvBufferBatchGetter: *transaction.NewBufferSnapshotBatchGetter(tikvBuffer, tikvSnapshot)}
}

// BufferSnapshotBatchGetter is the type for BatchGet with MemBuffer.
type BufferSnapshotBatchGetter struct {
tikvBufferBatchGetter transaction.BufferSnapshotBatchGetter
}

// BatchGet implements the BatchGetter interface.
func (b *BufferSnapshotBatchGetter) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
tikvKeys := toTiKVKeys(keys)
storageValues, err := b.tikvBufferBatchGetter.BatchGet(ctx, tikvKeys)

return storageValues, err
}
32 changes: 32 additions & 0 deletions pkg/store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (m *memBuffer) BatchGet(ctx context.Context, keys [][]byte) (map[string][]b
return data, derr.ToTiDBErr(err)
}

func (m *memBuffer) GetSnapshot() kv.MemBufferSnapshot {
return &snapshot{m.MemBuffer.GetSnapshot()}
}

type tikvGetter struct {
tikv.Getter
}
Expand Down Expand Up @@ -195,6 +199,34 @@ func getTiDBKeyFlags(flag tikvstore.KeyFlags) kv.KeyFlags {
return v
}

type snapshot struct {
kv.MemBufferSnapshot
}

func (s *snapshot) Get(ctx context.Context, key []byte) ([]byte, error) {
data, err := s.MemBufferSnapshot.Get(ctx, key)
return data, derr.ToTiDBErr(err)
}

func (s *snapshot) Len() int {
return 0
}

func (s *snapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
ret := make(map[string][]byte, len(keys))
for _, key := range keys {
val, err := s.Get(ctx, key)
if kv.IsErrNotFound(err) {
continue
}
if err != nil {
return nil, err
}
ret[string(key)] = val
}
return ret, nil
}

func getTiKVFlagsOp(op kv.FlagsOp) tikvstore.FlagsOp {
switch op {
case kv.SetPresumeKeyNotExists:
Expand Down