Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: Azure/azure-event-hubs-java
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: dev
Choose a base ref
...
head repository: CaperAi/azure-event-hubs-java
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: dev
Choose a head ref

There isn’t anything to compare.

Azure:dev and CaperAi:dev are entirely different commit histories.

Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -317,10 +318,16 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
(bp.getLeaseState() == LeaseState.LEASED)));
});
future = CompletableFuture.completedFuture(infos);
} catch (URISyntaxException | StorageException e) {
TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
} catch (Exception e) {
Throwable effective = e;
if (e instanceof NoSuchElementException) {
// If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
// Strip the misleading NoSuchElementException to provide a meaningful error for the user.
effective = e.getCause();
}
TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), effective);
future = new CompletableFuture<List<BaseLease>>();
future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
}

return future;
Original file line number Diff line number Diff line change
@@ -290,31 +290,42 @@ private Void scan(boolean isFirst) {
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) ->
{
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
if ((e != null) && !(e instanceof ClosingException)) {
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
}

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
if (!this.getIsClosingOrClosed()) {
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() :
this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
if (isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());
try {
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) ->
{
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
if ((e != null) && !(e instanceof ClosingException)) {
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
}

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
if (!this.getIsClosingOrClosed()) {
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() :
this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
if (isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());
} catch (Exception e) {
TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e);
if (!this.getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
}
}

return null;
}