Skip to content

Commit

Permalink
Merge pull request #30573 from ngmr/capture-SpillDispatcher-stop-reason
Browse files Browse the repository at this point in the history
Capture reason for SpillDispatcher stop
  • Loading branch information
joe-chacko authored Jan 20, 2025
2 parents aaee37d + 460eafd commit ccaa1a6
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ibm.ws.sib.msgstore;

/*******************************************************************************
* Copyright (c) 2012, 2022 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -11,7 +11,8 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/

import java.io.IOException;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -319,4 +320,9 @@ public void setRedeliveryCountColumn(boolean hasRedeliveryCountColumn)
_hasRedeliveryCountColumn = hasRedeliveryCountColumn;
}

public final void stop(int mode) {
stop(mode, null);
}

public abstract void stop(int mode, Throwable reason);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.ibm.ws.sib.msgstore;

/*******************************************************************************
* Copyright (c) 2012, 2014 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -11,7 +9,10 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore;


import com.ibm.ws.sib.utils.RuntimeInfo;

Expand Down Expand Up @@ -391,7 +392,7 @@ public static enum MaximumAllowedDeliveryDelayAction {
/* Persistence Layer Constants */
/*************************************************************************/
public static final String START_MODE = "STARTMODE"; // F008622--start
public static final String DEAFULT_START_MODE = "NORMAL"; //F008622-end
public static final String DEFAULT_START_MODE = "NORMAL"; //F008622-end
public static final String DEFAULT_DATABASE_NAME = "SIBDB";
public static final String DEFAULT_STOGROUP_NAME = "SIBSG";
public static final String DEFAULT_BUFPOOL_NAME = "BP1";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2012, 2022 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -9,8 +9,8 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/

* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.impl;

import static com.ibm.websphere.ras.TraceComponent.isAnyTracingEnabled;
Expand Down Expand Up @@ -215,7 +215,7 @@ public MessageStoreImpl(@Reference ItemInterface itemInterface) {
// only to be used by existing unit tests, not within an OSGi environment
// N.B. this is invoked reflectively — run messaging unit tests to validate any changes
public static MessageStoreImpl createForTesting() {
return new MessageStoreImpl(null);
return new MessageStoreImpl(null);
}

public final AbstractItem _findById(long itemID) throws SevereMessageStoreException
Expand Down Expand Up @@ -1654,7 +1654,7 @@ public void start(int arg0) throws Exception
setStartupException(e);

// close everything we have opened
stop(0); // 247659
stop(0, e); // 247659
//Report that we have failed to start cleanly.
reportLocalError();
throw new Exception(e);
Expand All @@ -1670,7 +1670,7 @@ public void start(int arg0) throws Exception
* @see com.ibm.ws.sib.admin.JsEngineComponent#stop(int)
*/
@Override
public final void stop(int arg0)
public final void stop(int arg0, Throwable reason)
{
if (isAnyTracingEnabled() && tc.isEntryEnabled())
SibTr.entry(this, tc, "stop", Integer.valueOf(arg0));
Expand Down Expand Up @@ -1698,7 +1698,7 @@ public final void stop(int arg0)
_membershipMap.clear();

if (null != _persistentMessageStore)
_persistentMessageStore.stop(arg0);
_persistentMessageStore.stop(arg0, reason);

// Null out any references to our cache/om cache
_rootMembership = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ibm.ws.sib.msgstore.persistence;
/*******************************************************************************
* Copyright (c) 2012 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -10,7 +9,9 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.persistence;

import java.util.Collection;

Expand All @@ -35,8 +36,9 @@ public interface Dispatcher
*
* @param mode specifies the type of stop operation which is to
* be performed.
* @param reason Reason for the stop being requested.
*/
public void stop(int mode);
public void stop(int mode, Throwable reason);

// Defect 338397
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ibm.ws.sib.msgstore.persistence;
/*******************************************************************************
* Copyright (c) 2012 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -10,7 +9,9 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.persistence;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -114,7 +115,7 @@ public interface PersistentMessageStore
*
* @param mode specifies the type of stop operation which is to be performed.
*/
public void stop(int mode);
public void stop(int mode, Throwable reason);

/**
* Creates a UniqueKeyGenerator object with a persistent range counter.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ibm.ws.sib.msgstore.persistence.dispatcher;
/*******************************************************************************
* Copyright (c) 2012 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -10,7 +9,9 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.persistence.dispatcher;

import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.sib.msgstore.MessageStoreConstants;
Expand Down Expand Up @@ -114,4 +115,9 @@ protected static long obtainLongConfigParameter(MessageStoreImpl msi, String par

return value;
}

@Deprecated
public final void stop(int mode) {
stop(mode, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.ibm.ws.sib.msgstore.persistence.dispatcher;

/*******************************************************************************
* Copyright (c) 2024 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2024, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -11,36 +9,35 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.persistence.dispatcher;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;

import com.ibm.ws.sib.msgstore.persistence.dispatcher.StateUtils.StateUpdater;

final class DispatcherState {
static final StateUpdater<DispatcherState> updaterForStart = new StateUpdater<DispatcherState>() {
public DispatcherState update(DispatcherState currentState) {
return currentState.stopRequested(false).running(true);
}
};
static final StateUpdater<DispatcherState> updaterForStopRequested = new StateUpdater<DispatcherState>() {
public DispatcherState update(DispatcherState currentState) {
if (false == currentState.isRunning) return currentState;
return currentState.stopRequested(true);
}
};
static final StateUpdater<DispatcherState> updaterForStopped = new StateUpdater<DispatcherState>() {
public DispatcherState update(DispatcherState currentState) {
return currentState.running(false);
}
};
static final StateUpdater<DispatcherState> updaterForErrorOccurred = new StateUpdater<DispatcherState>() {
public DispatcherState update(DispatcherState currentState) {
return currentState.addThreadWriteError();
}
};
static final StateUpdater<DispatcherState> updaterForErrorCleared = new StateUpdater<DispatcherState>() {
public DispatcherState update(DispatcherState currentState) {
return currentState.clearThreadWriteError();
static final StateUpdater<DispatcherState> updaterForStart = state -> state.startRequested().running(true);
static final StateUpdater<DispatcherState> updaterForStopped = state -> state.running(false);
static final StateUpdater<DispatcherState> updaterForErrorOccurred = DispatcherState::addThreadWriteError;
static final StateUpdater<DispatcherState> updaterForErrorCleared = DispatcherState::clearThreadWriteError;

public static final class StopRequesterInfo extends Throwable {
private static final long serialVersionUID = 1L;
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yy.MM.dd_HH.mm.ss.SSS_Z");

public StopRequesterInfo(Throwable requester) {
super("Stop requested at " + DATE_FORMATTER.format(OffsetDateTime.now()), requester);
}
};
}

static StateUpdater<DispatcherState> getUpdaterForStopRequested(final Throwable requester) {
return state -> state.isRunning ? state.stopRequested(new StopRequesterInfo(requester)) : state;
}

// Flag set to indicate whether dispatcher is running.
final boolean isRunning;
Expand All @@ -49,42 +46,54 @@ public DispatcherState update(DispatcherState currentState) {
final boolean isStopRequested;
// Count of the number of worker threads experiencing write errors.
final int threadWriteErrors;
final Throwable requester;

DispatcherState() {
this(false, false, 0);
this(false, false, 0, null);
}

private DispatcherState(boolean running, boolean stopRequested, int threadWriteErrors) {
private DispatcherState(boolean running, boolean stopRequested, int threadWriteErrors, Throwable requester) {
this.isRunning = running;
this.isStopRequested = stopRequested;
this.threadWriteErrors = threadWriteErrors;
this.requester = requester;
}

private DispatcherState running(final boolean running) {
return (running == isRunning) ? this : new DispatcherState(running, this.isStopRequested, this.threadWriteErrors);
return (running == isRunning) ? this : new DispatcherState(running, isStopRequested, threadWriteErrors, requester);
}

DispatcherState startRequested() {
return isStopRequested ? new DispatcherState(isRunning, false, threadWriteErrors, null) : this;
}

private DispatcherState stopRequested(final boolean stopRequested) {
return (stopRequested == isStopRequested) ? this : new DispatcherState(this.isRunning, stopRequested, this.threadWriteErrors);
private DispatcherState stopRequested(Throwable requester) {
return isStopRequested ? this : new DispatcherState(isRunning, true, threadWriteErrors, requester);
}

private DispatcherState addThreadWriteError() {
return new DispatcherState(isRunning, isStopRequested, (threadWriteErrors + 1));
return new DispatcherState(isRunning, isStopRequested, (threadWriteErrors + 1), requester);
}

private DispatcherState clearThreadWriteError() {
return (0 >= threadWriteErrors) ? this : new DispatcherState(isRunning, isStopRequested, (threadWriteErrors - 1));
return (0 >= threadWriteErrors) ? this : new DispatcherState(isRunning, isStopRequested, (threadWriteErrors - 1), requester);
}

boolean isHealthy() {
return isRunning && !isStopRequested && (0 == threadWriteErrors);
}

String desc() {
String s = "";
if (isStopRequested) s+= " (STOP REQUESTED)";
if (!isRunning) s+= " (STOPPED)";
if (0 < threadWriteErrors) s+= " (ERROR)";
return s;
StringBuilder sb = new StringBuilder();
if (isStopRequested) sb.append(" (STOP REQUESTED)");
if (!isRunning) sb.append(" (STOPPED)");
if (0 < threadWriteErrors) sb.append(" (ERROR (").append(threadWriteErrors).append("))");
if (null != requester) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
requester.printStackTrace(pw.printf("%n"));
sb.append(sw);
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.ibm.ws.sib.msgstore.persistence.dispatcher;

/*******************************************************************************
* Copyright (c) 2012, 2024 IBM Corporation and others.
/* ==============================================================================
* Copyright (c) 2012, 2025 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
Expand All @@ -11,7 +9,10 @@
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
* ==============================================================================
*/
package com.ibm.ws.sib.msgstore.persistence.dispatcher;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -36,10 +37,10 @@
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.sib.utils.ras.SibTr;

import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.getUpdaterForStopRequested;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.updaterForErrorCleared;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.updaterForErrorOccurred;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.updaterForStart;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.updaterForStopRequested;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.DispatcherState.updaterForStopped;
import static com.ibm.ws.sib.msgstore.persistence.dispatcher.StateUtils.updateState;

Expand Down Expand Up @@ -329,7 +330,7 @@ public void start()
if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) SibTr.exit(this, tc, "start");
}

public void stop(int mode)
public void stop(int mode, Throwable reason)
{
if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) SibTr.entry(this, tc, "stop", Integer.valueOf(mode));

Expand All @@ -338,7 +339,7 @@ public void stop(int mode)
// First change the state of the dispatcher
synchronized(this)
{
performingTheStop = updateState(stateRef, updaterForStopRequested);
performingTheStop = updateState(stateRef, getUpdaterForStopRequested(reason));
} // end synchronized


Expand Down
Loading

0 comments on commit ccaa1a6

Please sign in to comment.