Skip to content

Commit

Permalink
IGNITE-24334 Fixed flaky ClientSessionOutboundQueueLimitTest
Browse files Browse the repository at this point in the history
  • Loading branch information
petrov-mg committed Jan 29, 2025
1 parent 348e844 commit d1e8240
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ private void unblock(Object waiter) {
}

/** {@inheritDoc} */
@Async.Schedule
@Override public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
if (!registerWaiter(lsnr))
notifyListener(lsnr);
Expand Down Expand Up @@ -466,7 +465,6 @@ private <T> void applyChainComposeCallback(
*
* @param lsnr Listener.
*/
@Async.Execute
private void notifyListener(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
assert lsnr != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
Expand All @@ -35,14 +37,34 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;

/** */
@RunWith(Parameterized.class)
public class ClientSessionOutboundQueueLimitTest extends GridCommonAbstractTest {
/** */
public static final int MSG_QUEUE_LIMIT = 100;

/** Whether a client node is an initiator of the test operations. */
@Parameterized.Parameter()
public int idx;

/** */
@Parameterized.Parameters(name = "idx={0}")
public static Iterable<Object> data() {
return IntStream.range(0, 1000).boxed().collect(Collectors.toList());
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
Expand All @@ -68,7 +90,8 @@ public void testClientSessionOutboundQueueLimit() throws Exception {
try (
IgniteClient cli = Ignition.startClient(new ClientConfiguration()
.setAddresses("127.0.0.1:10800")
.setReconnectThrottlingRetries(1)
.setTimeout(5000)
.setRetryLimit(1)
.setEventListeners(new ConnectionEventListener() {
@Override public void onConnectionClosed(ConnectionClosedEvent event) {
isCliDisconnected.set(true);
Expand All @@ -88,8 +111,11 @@ public void testClientSessionOutboundQueueLimit() throws Exception {
Collection<IgniteClientFuture<byte[]>> futs = new ArrayList<>();

try {
while (!isCliDisconnected.get())
while (!isCliDisconnected.get()) {
futs.add(cache.getAsync(0));

U.sleep(10);
}
}
finally {
skipClientWrite(grid(0), false);
Expand All @@ -102,10 +128,6 @@ public void testClientSessionOutboundQueueLimit() throws Exception {
fut.get();
}
catch (Exception e) {
assertTrue(
e.getMessage().contains("Channel is closed") || e.getMessage().contains("Reconnect is not allowed")
);

failedReqsCntr.incrementAndGet();
}
});
Expand Down

0 comments on commit d1e8240

Please sign in to comment.