Skip to content

Commit

Permalink
Add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Feb 21, 2024
1 parent 5179991 commit 696b7a9
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.LockUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.event.MetricsEventBus;
Expand Down Expand Up @@ -56,6 +57,8 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
Expand Down Expand Up @@ -122,6 +125,8 @@ public abstract class AbstractDirectory<T> implements Directory<T> {

private volatile ScheduledFuture<?> connectivityCheckFuture;

private final ReentrantLock invokerRefreshLock = new ReentrantLock();

/**
* The max count of invokers for each reconnect task select to try to reconnect.
*/
Expand Down Expand Up @@ -322,23 +327,30 @@ public void checkConnectivity() {
// 1. pick invokers from invokersToReconnect
// limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor
// for long time
if (invokersToReconnect.size() < reconnectTaskTryCount) {
invokersToTry.addAll(invokersToReconnect);
} else {
for (int i = 0; i < reconnectTaskTryCount; i++) {
Invoker<T> tInvoker = invokersToReconnect.get(
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
if (!invokersToTry.contains(tInvoker)) {
// ignore if is selected, invokersToTry's size is always smaller than
// reconnectTaskTryCount + 1
invokersToTry.add(tInvoker);
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
if (invokersToReconnect.size() < reconnectTaskTryCount) {
invokersToTry.addAll(invokersToReconnect);
} else {
for (int i = 0; i < reconnectTaskTryCount; i++) {
Invoker<T> tInvoker = invokersToReconnect.get(
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
if (!invokersToTry.contains(tInvoker)) {
// ignore if is selected, invokersToTry's size is always smaller than
// reconnectTaskTryCount + 1
invokersToTry.add(tInvoker);
}
}
}
}
});

// 2. try to check the invoker's status
for (Invoker<T> invoker : invokersToTry) {
if (invokers.contains(invoker)) {
AtomicBoolean invokerExist = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
invokerExist.set(invokers.contains(invoker));
});
// Should not lock here, `invoker.isAvailable` may need some time to check
if (invokerExist.get()) {
if (invoker.isAvailable()) {
needDeleteList.add(invoker);
}
Expand All @@ -348,25 +360,26 @@ public void checkConnectivity() {
}

// 3. recover valid invoker
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
logger.info(
"Recover service address: " + tInvoker.getUrl() + " from invalid list.");
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
for (Invoker<T> tInvoker : needDeleteList) {
if (invokers.contains(tInvoker)) {
addValidInvoker(tInvoker);
logger.info("Recover service address: " + tInvoker.getUrl()
+ " from invalid list.");
}
invokersToReconnect.remove(tInvoker);
}
invokersToReconnect.remove(tInvoker);
}

// 4. refresh valid invokers list
refreshInvoker();
});
} finally {
checkConnectivityPermit.release();
}

// 5. submit new task if it has more to recover
if (!invokersToReconnect.isEmpty()) {
checkConnectivity();
}
// 4. submit new task if it has more to recover
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
if (!invokersToReconnect.isEmpty()) {
checkConnectivity();
}
});
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(
applicationModel, getSummary(), getDirectoryMeta()));
},
Expand Down Expand Up @@ -417,25 +430,29 @@ private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invo

@Override
public void addDisabledInvoker(Invoker<T> invoker) {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
logger.info("Disable service address: " + invoker.getUrl() + ".");
}
});
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
logger.info("Recover service address: " + invoker.getUrl() + " from disabled list.");
} catch (Throwable ignore) {
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
logger.info("Recover service address: " + invoker.getUrl() + " from disabled list.");
} catch (Throwable ignore) {

}
}
}
});
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}
Expand Down Expand Up @@ -494,39 +511,43 @@ public Set<Invoker<T>> getDisabledInvokers() {
}

protected void setInvokers(BitList<Invoker<T>> invokers) {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
});

MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
}

protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
this.invokers = BitList.emptyList();
this.validInvokers = BitList.emptyList();
this.invokersInitialized = false;
});
}

private boolean addValidInvoker(Invoker<T> invoker) {
boolean result;
synchronized (this.validInvokers) {
result = this.validInvokers.add(invoker);
}
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
result.set(this.validInvokers.add(invoker));
});
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
return result;
return result.get();
}

private boolean removeValidInvoker(Invoker<T> invoker) {
boolean result;
synchronized (this.validInvokers) {
result = this.validInvokers.remove(invoker);
}
AtomicBoolean result = new AtomicBoolean(false);
LockUtils.safeLock(invokerRefreshLock, 60_000, () -> {
result.set(this.validInvokers.remove(invoker));
});
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta()));
return result;
return result.get();
}

protected abstract List<Invoker<T>> doList(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.utils;

import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;

public class LockUtils {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(LockUtils.class);

public static void safeLock(Lock lock, int timeout, Runnable runnable) {
try {
if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.error(
LoggerCodeConstants.INTERNAL_ERROR,
"",
"",
"Try to lock failed, timeout: " + timeout,
new TimeoutException());
}
runnable.run();
} catch (InterruptedException e) {
logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e);
Thread.currentThread().interrupt();
} finally {
try {
lock.unlock();
} catch (Exception e) {
// ignore
}
}
}
}

0 comments on commit 696b7a9

Please sign in to comment.