Skip to content

Commit

Permalink
IGNITE-24175 Show internal jobs in system view (#11798)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Jan 10, 2025
1 parent f4ff01c commit 5d1742c
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** Collision SPI is not available: {@link GridCollisionManager#enabled()} {@code == false}. */
private final boolean jobAlwaysActivate;

/** */
private volatile ConcurrentMap<IgniteUuid, GridJobWorker> syncRunningJobs;

/** */
private volatile ConcurrentMap<IgniteUuid, GridJobWorker> activeJobs;

Expand Down Expand Up @@ -338,6 +341,8 @@ public GridJobProcessor(GridKernalContext ctx) {

metricsUpdateFreq = ctx.config().getMetricsUpdateFrequency();

syncRunningJobs = new ConcurrentHashMap<>();

activeJobs = initJobsMap(jobAlwaysActivate);

passiveJobs = jobAlwaysActivate ? null : new JobsMap(1024, 0.75f, 256);
Expand Down Expand Up @@ -371,11 +376,11 @@ public GridJobProcessor(GridKernalContext ctx) {
ctx.systemView().registerInnerCollectionView(JOBS_VIEW, JOBS_VIEW_DESC,
new ComputeJobViewWalker(),
passiveJobs == null ?
Arrays.asList(activeJobs, cancelledJobs) :
Arrays.asList(activeJobs, passiveJobs, cancelledJobs),
Arrays.asList(activeJobs, syncRunningJobs, cancelledJobs) :
Arrays.asList(activeJobs, syncRunningJobs, passiveJobs, cancelledJobs),
ConcurrentMap::entrySet,
(map, e) -> {
ComputeJobState state = map == activeJobs ? ComputeJobState.ACTIVE :
ComputeJobState state = (map == activeJobs || map == syncRunningJobs) ? ComputeJobState.ACTIVE :
(map == passiveJobs ? ComputeJobState.PASSIVE : ComputeJobState.CANCELED);

return new ComputeJobView(e.getKey(), e.getValue(), state);
Expand Down Expand Up @@ -428,6 +433,8 @@ public GridJobProcessor(GridKernalContext ctx) {
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
// Clear collections.
syncRunningJobs = new ConcurrentHashMap<>();

activeJobs = initJobsMap(jobAlwaysActivate);

activeJobsMetric.reset();
Expand Down Expand Up @@ -798,10 +805,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu
cancelPassiveJob(job);
}
}

for (GridJobWorker job : activeJobs.values()) {
if (idsMatch.test(job))
cancelActiveJob(job, sys);
}

for (GridJobWorker job : syncRunningJobs.values()) {
if (idsMatch.test(job))
cancelJob(job, sys);
}
}
else {
if (!jobAlwaysActivate) {
Expand All @@ -813,8 +826,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu

GridJobWorker activeJob = activeJobs.get(jobId);

if (activeJob != null && idsMatch.test(activeJob))
if (activeJob != null && idsMatch.test(activeJob)) {
cancelActiveJob(activeJob, sys);

return;
}

activeJob = syncRunningJobs.get(jobId);

if (activeJob != null && idsMatch.test(activeJob))
cancelJob(activeJob, sys);
}
}
finally {
Expand Down Expand Up @@ -1364,7 +1385,7 @@ public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteReque
// This is an internal job and can be executed inside busy lock
// since job is expected to be short.
// This is essential for proper stop without races.
job.run();
runSync(job);

// No execution outside lock.
job = null;
Expand Down Expand Up @@ -1441,6 +1462,23 @@ else if (jobAlwaysActivate) {
job.run();
}

/**
* Adds job to {@link #syncRunningJobs} while run to provide info in system view.
* @param job Job to add in system view and run synchronously.
*/
private void runSync(GridJobWorker job) {
IgniteUuid jobId = job.getJobId();

syncRunningJobs.put(jobId, job);

try {
job.run();
}
finally {
syncRunningJobs.remove(jobId);
}
}

/**
* Callback from job worker to set current task session for execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -35,6 +34,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.apache.ignite.internal.processors.metric.impl.PeriodicHistogramMetricImpl;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
import org.apache.ignite.internal.processors.service.DummyService;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridTestClockTimer;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.typedef.F;
Expand All @@ -103,6 +105,7 @@
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.spi.systemview.view.BaselineNodeAttributeView;
import org.apache.ignite.spi.systemview.view.BaselineNodeView;
Expand All @@ -114,6 +117,7 @@
import org.apache.ignite.spi.systemview.view.ClientConnectionAttributeView;
import org.apache.ignite.spi.systemview.view.ClientConnectionView;
import org.apache.ignite.spi.systemview.view.ClusterNodeView;
import org.apache.ignite.spi.systemview.view.ComputeJobView;
import org.apache.ignite.spi.systemview.view.ComputeTaskView;
import org.apache.ignite.spi.systemview.view.ConfigurationView;
import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
Expand Down Expand Up @@ -180,6 +184,7 @@
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.SETS_VIEW;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.STAMPED_VIEW;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.VOLATILE_DATA_REGION_NAME;
import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl.DISTRIBUTED_METASTORE_VIEW;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLI_CONN_ATTR_VIEW;
Expand Down Expand Up @@ -209,6 +214,12 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
/** */
public static final String TEST_TRANSFORMER = "TestTransformer";

/** */
private static CountDownLatch jobStartedLatch;

/** */
private static CountDownLatch releaseJobLatch;

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
Expand Down Expand Up @@ -598,63 +609,80 @@ public void testComputeAffinityCall() throws Exception {
/** */
@Test
public void testComputeTask() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(2);
doTestComputeTask(false);
}

try (IgniteEx g1 = startGrid(0)) {
SystemView<ComputeTaskView> tasks = g1.context().systemView().view(TASKS_VIEW);
/** */
@Test
public void testInternalComputeTask() throws Exception {
doTestComputeTask(true);
}

/** */
private void doTestComputeTask(boolean internal) throws Exception {
int gridCnt = 3;

IgniteEx g1 = startGrids(gridCnt);

try {
IgniteCache<Integer, Integer> cache = g1.createCache("test-cache");

cache.put(1, 1);

g1.compute().executeAsync(new ComputeTask<Object, Object>() {
@Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
@Nullable Object arg) throws IgniteException {
return Collections.singletonMap(new ComputeJob() {
@Override public void cancel() {
// No-op.
}
for (int i = 0; i < gridCnt; i++) {
IgniteEx grid = grid(i);

@Override public Object execute() throws IgniteException {
return 1;
}
}, subgrid.get(0));
}
SystemView<ComputeTaskView> tasks = grid.context().systemView().view(TASKS_VIEW);

@Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
try {
barrier.await();
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
jobStartedLatch = new CountDownLatch(3);
releaseJobLatch = new CountDownLatch(1);

return null;
}
IgniteInternalFuture<Object> fut
= runAsync(() -> grid.compute().execute(internal ? new InternalTask() : new UserTask(), 1));

assertTrue(jobStartedLatch.await(30_000, TimeUnit.MILLISECONDS));

try {
assertEquals(1, tasks.size());

ComputeTaskView t = tasks.iterator().next();

@Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
return 1;
assertEquals("Expecting to see " + (internal ? "internal" : "user") + " task", internal, t.internal());
assertNull(t.affinityCacheName());
assertEquals(-1, t.affinityPartitionId());
assertTrue(t.taskClassName().startsWith(getClass().getName()));
assertTrue(t.taskName().startsWith(getClass().getName()));
assertEquals(grid.localNode().id(), t.taskNodeId());
assertEquals("0", t.userVersion());

checkJobs(gridCnt, internal, t.sessionId());
}
finally {
releaseJobLatch.countDown();
}
}, 1);

barrier.await();
fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
}
}
finally {
stopAllGrids();
}
}

assertEquals(1, tasks.size());
/** */
private void checkJobs(int gridCnt, boolean internal, IgniteUuid sesId) {
for (int i = 0; i < gridCnt; i++) {
SystemView<ComputeJobView> jobs = grid(i).context().systemView().view(JOBS_VIEW);

ComputeTaskView t = tasks.iterator().next();
assertTrue("Expecting to see " + (internal ? "internal" : "user") + " job", jobs.size() > 0);

assertFalse(t.internal());
assertNull(t.affinityCacheName());
assertEquals(-1, t.affinityPartitionId());
assertTrue(t.taskClassName().startsWith(getClass().getName()));
assertTrue(t.taskName().startsWith(getClass().getName()));
assertEquals(g1.localNode().id(), t.taskNodeId());
assertEquals("0", t.userVersion());
ComputeJobView job = jobs.iterator().next();

barrier.await();
assertEquals(sesId, job.sessionId());
assertEquals("Expecting to see " + (internal ? "internal" : "user") + " job", internal, job.isInternal());
}

releaseJobLatch.countDown();
}

/** */
Expand Down Expand Up @@ -2592,4 +2620,53 @@ public TestRunnable(CountDownLatch latch, int idx) {
return getClass().getSimpleName() + idx;
}
}

/** */
private static class UserTask implements ComputeTask<Object, Object> {
/** {@inheritDoc} */
@Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid,
@Nullable Object arg
) throws IgniteException {
return subgrid.stream().collect(Collectors.toMap(k -> new UserJob(), Function.identity()));
}

/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
return ComputeJobResultPolicy.WAIT;
}

/** {@inheritDoc} */
@Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
return 1;
}

/** */
private static class UserJob implements ComputeJob {
/** {@inheritDoc} */
@Override public void cancel() {
// No-op.
}

/** {@inheritDoc} */
@Override public Object execute() throws IgniteException {
jobStartedLatch.countDown();

try {
assertTrue(releaseJobLatch.await(30_000, TimeUnit.MILLISECONDS));
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}

return 1;
}
}
}

/** */
@GridInternal
public static class InternalTask extends UserTask {
// No-op.
}
}
Loading

0 comments on commit 5d1742c

Please sign in to comment.