Skip to content

Commit

Permalink
[BugFix] Fix LockManager release not notify all waiters that meet the…
Browse files Browse the repository at this point in the history
… conditions (backport #54922)  (#54950)

(cherry picked from commit 1bd5937)
  • Loading branch information
HangyuanLiu authored Jan 10, 2025
1 parent f6337a5 commit dfae8f9
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public boolean isOwner(long rid, Locker locker, LockType lockType) {
}
}

public boolean isOwnerInternal(long rid, Locker locker, LockType lockType, int lockTableIndex) {
private boolean isOwnerInternal(long rid, Locker locker, LockType lockType, int lockTableIndex) {
final Map<Long, Lock> lockTable = lockTables[lockTableIndex];
final Lock lock = lockTable.get(rid);
return lock != null && lock.isOwner(locker, lockType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public MultiUserLock(LockHolder lockHolder) {
@Override
public LockGrantType lock(Locker locker, LockType lockType) throws LockException {
LockHolder lockHolderRequest = new LockHolder(locker, lockType);
LockGrantType lockGrantType = tryLock(lockHolderRequest);
LockGrantType lockGrantType = tryLock(lockHolderRequest, waiterNum() == 0);
if (lockGrantType == LockGrantType.NEW) {
addOwner(lockHolderRequest);
} else if (lockGrantType == LockGrantType.WAIT) {
Expand All @@ -58,7 +58,17 @@ public LockGrantType lock(Locker locker, LockType lockType) throws LockException
return lockGrantType;
}

private LockGrantType tryLock(LockHolder lockHolderRequest) throws LockException {
/**
* @param noWaiters indicates whether there are other waiters. This will determine whether the lock
* can be directly acquired. If there are other waiters, the current locker cannot jump in
* line to acquire the lock first. A special scenario is to notify waiters in the
* existing wait list during release. At this time, the wait list needs to be ignored and
* as many waiters as possible need to be awakened.
* @return LockGrantType.NEW means that the lock ownership can be obtained.
* LockGrantType.EXISTING means that the current lock already exists and needs to be re-entered.
* LockGrantType.WAIT means that there is a lock conflict with the current owner and it is necessary to wait.
*/
private LockGrantType tryLock(LockHolder lockHolderRequest, boolean noWaiters) throws LockException {
if (ownerNum() == 0) {
return LockGrantType.NEW;
}
Expand Down Expand Up @@ -131,7 +141,7 @@ private LockGrantType tryLock(LockHolder lockHolderRequest) throws LockException
}
}

if (!hasConflicts && (hasSameLockerWithDifferentLockType || waiterNum() == 0)) {
if (!hasConflicts && (hasSameLockerWithDifferentLockType || noWaiters)) {
return LockGrantType.NEW;
} else {
return LockGrantType.WAIT;
Expand Down Expand Up @@ -205,7 +215,7 @@ public Set<Locker> release(Locker locker, LockType lockType) throws LockExceptio
}

while (lockWaiter != null) {
LockGrantType lockGrantType = tryLock(lockWaiter);
LockGrantType lockGrantType = tryLock(lockWaiter, true);

if (lockGrantType == LockGrantType.NEW
|| lockGrantType == LockGrantType.EXISTING) {
Expand All @@ -227,6 +237,7 @@ public Set<Locker> release(Locker locker, LockType lockType) throws LockExceptio

if (lockWaiterIterator != null && lockWaiterIterator.hasNext()) {
lockWaiter = lockWaiterIterator.next();
isFirstWaiter = false;
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -34,20 +38,20 @@
* - Each TestDBResource starts a group of threads (16 in total) for concurrent testing,
* with the following responsibilities:
* <p>
* - 5 threads perform concurrent write operations. Each thread randomly selects a table and executes
* 1 million updateOneRandomTable operations on that table. Acquiring the intent write lock on the db and
* then the write lock on the table is required before updating the table.
* - 5 threads perform concurrent write operations. Each thread randomly selects a table and executes
* 1 million updateOneRandomTable operations on that table. Acquiring the intent write lock on the db and
* then the write lock on the table is required before updating the table.
* <p>
* - 5 threads perform concurrent write operations. Each thread executes 1 million updateAllTables
* operations on the db. Acquiring the write lock on the db is required before executing this action.
* - 5 threads perform concurrent write operations. Each thread executes 1 million updateAllTables
* operations on the db. Acquiring the write lock on the db is required before executing this action.
* <p>
* - 3 threads perform concurrent read operations. Each thread acquires a read lock on the db,
* then randomly selects a table and verifies whether the two counters of that table are equal.
* If not, Assert.assertEquals() fails.
* - 3 threads perform concurrent read operations. Each thread acquires a read lock on the db,
* then randomly selects a table and verifies whether the two counters of that table are equal.
* If not, Assert.assertEquals() fails.
* <p>
* - 3 threads perform concurrent read operations. Each thread acquires an intent read lock on the db and
* a read lock on the table. It then verifies whether the two counters of that table are equal.
* If not, Assert.assertEquals() fails.
* - 3 threads perform concurrent read operations. Each thread acquires an intent read lock on the db and
* a read lock on the table. It then verifies whether the two counters of that table are equal.
* If not, Assert.assertEquals() fails.
* <p>
* - Finally, verify that the sum of counter1 for all tables under each db is 2 million.
* If not, Assert.assertEquals() fails.
Expand All @@ -62,7 +66,6 @@ public class LockManagerAllLockModesRandomTest {
private static final long TABLE_ID_START = 10000;
private static final long DB_ID_START = 20000;


/**
* We will acquire intensive lock or non-intensive lock on DB resource.
* When acquiring intensive lock, we will also acquire specific non-intensive lock on the table to update it.
Expand Down Expand Up @@ -127,6 +130,7 @@ public void incrTwoCounters() {
/**
* We always increase the two counters together, so we will assert whether the two counters are equal to test
* the correctness of the lock manager.
*
* @return the two counters in a pair
*/
public Pair<Long, Long> getTwoCounters() {
Expand Down Expand Up @@ -197,7 +201,7 @@ public void testAllLockModesConcurrent() throws InterruptedException {
locker = new Locker();
locker.lock(db.getId(), LockType.INTENTION_SHARED);
locker.lock(db.getTableByIndex(tableIndex).getId(), LockType.READ);
Pair<Long, Long> result = db.getOneRandomTable().getTwoCounters();
Pair<Long, Long> result = db.getTableByIndex(tableIndex).getTwoCounters();
Assert.assertEquals(result.first, result.second);
} catch (LockException e) {
Assert.fail();
Expand Down Expand Up @@ -279,24 +283,41 @@ public void testAllLockModesConcurrent() throws InterruptedException {
} // end for single db
} // enf for all dbs

// Start all threads.
for (Thread t : allThreadList) {
t.start();
ExecutorService executor = Executors.newFixedThreadPool(100);

List<Future<?>> wf = new ArrayList<>();
for (Thread thread : writeThreadList) {
Future<?> f = executor.submit(thread);
wf.add(f);
}

List<Future<?>> rf = new ArrayList<>();
for (Thread thread : readThreadList) {
Future<?> f = executor.submit(thread);
rf.add(f);
}

// Wait for write threads end.
for (Thread t : writeThreadList) {
t.join();
for (Future<?> f : wf) {
try {
f.get();
} catch (ExecutionException e) {
Assert.fail(e.getMessage());
}
}
System.out.println("All write threads end.");

readerStop.set(true);
// Wait for read threads end.
for (Thread t : readThreadList) {
t.join();
for (Future<?> f : rf) {
try {
f.get();
} catch (ExecutionException e) {
Assert.fail(e.getMessage());
}
}
System.out.println("All read threads end.");

System.out.println("All read threads end.");

// Verify the correctness of the lock manager.
for (TestDBResource db : dbs) {
Expand All @@ -311,6 +332,5 @@ public void testAllLockModesConcurrent() throws InterruptedException {
Assert.assertEquals(30 * NUM_TEST_OPERATIONS, counter1Sum);
Assert.assertEquals(30 * NUM_TEST_OPERATIONS, counter2Sum);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.common.lock;

import com.starrocks.common.util.concurrent.lock.LockInfo;
import com.starrocks.common.util.concurrent.lock.LockManager;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.server.GlobalStateMgr;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.Future;

import static com.starrocks.common.lock.LockTestUtils.assertLockSuccess;
import static com.starrocks.common.lock.LockTestUtils.assertLockWait;

public class ReleaseTest {
@Before
public void setUp() {
GlobalStateMgr.getCurrentState().setLockManager(new LockManager());
}

@Test
public void testReleaseInvoke() throws Exception {
long rid = 1L;

TestLocker testLocker1 = new TestLocker();
assertLockSuccess(testLocker1.lock(rid, LockType.WRITE));

TestLocker testLocker2 = new TestLocker();
Future<LockResult> f2 = testLocker2.lock(rid, LockType.READ);
assertLockWait(f2);

TestLocker testLocker3 = new TestLocker();
Future<LockResult> f3 = testLocker3.lock(rid, LockType.READ);
assertLockWait(f3);

TestLocker testLocker4 = new TestLocker();
Future<LockResult> f4 = testLocker4.lock(rid, LockType.READ);
assertLockWait(f4);

assertLockSuccess(testLocker1.release(rid, LockType.WRITE));
LockTestUtils.assertLockSuccess(f2);
LockTestUtils.assertLockSuccess(f3);
LockTestUtils.assertLockSuccess(f4);

LockManager lockManager = GlobalStateMgr.getCurrentState().getLockManager();
LockInfo lockInfo = lockManager.dumpLockManager().get(0);
Assert.assertEquals(1, lockInfo.getRid().longValue());
Assert.assertEquals(3, lockInfo.getOwners().size());
Assert.assertEquals(0, lockInfo.getWaiters().size());
}
}

0 comments on commit dfae8f9

Please sign in to comment.