Skip to content

Commit

Permalink
Create ecchronos-binary Module and ecctool (#868)
Browse files Browse the repository at this point in the history
* Create ecchronos-binary Module and ecctool

* Fix check_style problems
  • Loading branch information
VictorCavichioli authored Feb 7, 2025
1 parent a3f58f4 commit 738cf0d
Show file tree
Hide file tree
Showing 57 changed files with 2,807 additions and 194 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Version 1.0.0 (Not yet Released)

* Create ecchronos-binary Module with ecctool for Interacting with the ecChronos REST API - Issue #867
* Update TestContainers Template to Enable or Disable Jolokia in Cassandra - Issue #844
* Reconcile Jolokia Notification Listener Implementation with RepairTask - Issue #831
* Introduce REST Module for Scheduling and Managing Cassandra Repairs - Issue #771
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private RepairGroup.Builder createRepairGroupBuilder(final ReplicaRepairGroup re
.withTableRepairMetrics(getTableRepairMetrics())
.withRepairResourceFactory(getRepairLockType().getLockFactory())
.withRepairLockFactory(REPAIR_LOCK_FACTORY)
.withJobId(getId());
.withJobId(getJobId());
}

@Override
Expand All @@ -90,7 +90,7 @@ public Iterator<ScheduledTask> iterator()
public OnDemandRepairJobView getView()
{
return new OnDemandRepairJobView(
getId(),
getJobId(),
getOngoingJob().getHostId(),
getOngoingJob().getTableReference(),
getStatus(),
Expand Down Expand Up @@ -124,7 +124,7 @@ public void postExecute(final boolean successful, final ScheduledTask task)
@Override
public void finishJob()
{
UUID id = getId();
UUID id = getJobId();
getOnFinishedHook().accept(id);
if (myTasks.isEmpty())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public OnDemandRepairJob(final Configuration configuration, final DistributedJmx
final OngoingJob ongoingJob,
final Node currentNode)
{
super(configuration, ongoingJob.getJobId());
super(configuration, ongoingJob.getJobId(), ongoingJob.getHostId());

myOngoingJob = Preconditions.checkNotNull(ongoingJob,
"Ongoing job must be set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ public abstract class ScheduledRepairJob extends ScheduledJob

public ScheduledRepairJob(
final Configuration configuration,
final UUID nodeID,
final TableReference tableReference,
final DistributedJmxProxyFactory jmxProxyFactory,
final RepairConfiguration repairConfiguration,
final List<TableRepairPolicy> repairPolicies,
final TableRepairMetrics tableRepairMetrics,
final RepairLockType repairLockType)
{
super(configuration);
super(configuration, nodeID);
myTableReference = Preconditions.checkNotNull(tableReference, "Table reference must be set");
myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, "JMX proxy factory must be set");
myRepairConfiguration = Preconditions.checkNotNull(repairConfiguration, "Repair configuration must be set");
Expand All @@ -58,15 +59,16 @@ public ScheduledRepairJob(

public ScheduledRepairJob(
final Configuration configuration,
final UUID id,
final UUID jobId,
final UUID nodeID,
final TableReference tableReference,
final DistributedJmxProxyFactory jmxProxyFactory,
final RepairConfiguration repairConfiguration,
final List<TableRepairPolicy> repairPolicies,
final TableRepairMetrics tableRepairMetrics,
final RepairLockType repairLockType)
{
super(configuration, id);
super(configuration, jobId, nodeID);
myTableReference = Preconditions.checkNotNull(tableReference, "Table reference must be set");
myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, "JMX proxy factory must be set");
myRepairConfiguration = Preconditions.checkNotNull(repairConfiguration, "Repair configuration must be set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private ScheduledTask createScheduledTask(final ReplicaRepairGroup replicaRepair
.withRepairResourceFactory(getRepairLockType().getLockFactory())
.withRepairLockFactory(REPAIR_LOCK_FACTORY)
.withRepairHistory(myRepairHistory)
.withJobId(getId())
.withJobId(getJobId())
.withNode(currentNode)
.build(ScheduledJob.Priority.HIGHEST.getValue());
}
Expand All @@ -142,7 +142,7 @@ private ScheduledTask createScheduledTask(final ReplicaRepairGroup replicaRepair
public OnDemandRepairJobView getView()
{
return new OnDemandRepairJobView(
getId(),
getJobId(),
getOngoingJob().getHostId(),
getTableReference(),
getStatus(),
Expand Down Expand Up @@ -176,7 +176,7 @@ public void postExecute(final boolean successful, final ScheduledTask task)
@Override
public void finishJob()
{
UUID id = getId();
UUID id = getJobId();
getOnFinishedHook().accept(id);
if (myTasks.isEmpty())
{
Expand All @@ -197,13 +197,13 @@ public ScheduledJob.State getState()
{
if (hasFailed())
{
LOG.error("Repair job with id {} failed", getId());
LOG.error("Repair job with id {} failed", getJobId());
return ScheduledJob.State.FAILED;
}
if (getOngoingJob().hasTopologyChanged())
{
LOG.error("Repair job with id {} failed. Token ranges have changed since repair has was triggered",
getId());
getJobId());
setFailed(true);
return ScheduledJob.State.FAILED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Collection;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,7 +57,7 @@ public class IncrementalRepairJob extends ScheduledRepairJob
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
IncrementalRepairJob(final Builder builder)
{
super(builder.myConfiguration, builder.myTableReference, builder.myJmxProxyFactory,
super(builder.myConfiguration, builder.myNodeId, builder.myTableReference, builder.myJmxProxyFactory,
builder.myRepairConfiguration, builder.myRepairPolicies, builder.myTableRepairMetrics, builder.myRepairLockType);
myNode = Preconditions.checkNotNull(builder.myNode, "Node must be set");
myReplicationState = Preconditions.checkNotNull(builder.myReplicationState, "Replication state must be set");
Expand All @@ -79,7 +80,8 @@ private void setLastSuccessfulRun()
public ScheduledRepairJobView getView()
{
long now = System.currentTimeMillis();
return new ScheduledRepairJobView(getId(), getTableReference(), getRepairConfiguration(), getStatus(now),
return new ScheduledRepairJobView(getNodeId(), getJobId(), getTableReference(), getRepairConfiguration(),
getStatus(now),
getProgress(), getNextRunInMs(), getLastSuccessfulRun(), getRepairConfiguration().getRepairType());
}

Expand Down Expand Up @@ -135,7 +137,7 @@ public Iterator<ScheduledTask> iterator()
.withReplicaRepairGroup(replicaRepairGroup)
.withRepairLockFactory(REPAIR_LOCK_FACTORY)
.withRepairResourceFactory(getRepairLockType().getLockFactory())
.withRepairPolicies(getRepairPolicies()).withJobId(getId());
.withRepairPolicies(getRepairPolicies()).withJobId(getJobId());
List<ScheduledTask> taskList = new ArrayList<>();
taskList.add(builder.build(getRealPriority()));
return taskList.iterator();
Expand Down Expand Up @@ -203,6 +205,7 @@ public static class Builder
private TableReference myTableReference;
private DistributedJmxProxyFactory myJmxProxyFactory;
private Node myNode;
private UUID myNodeId;
private TableRepairMetrics myTableRepairMetrics = null;
private ReplicationState myReplicationState;
private RepairConfiguration myRepairConfiguration = RepairConfiguration.DEFAULT;
Expand Down Expand Up @@ -246,6 +249,7 @@ public Builder withConfiguration(final ScheduledJob.Configuration configuration)
public Builder withNode(final Node node)
{
myNode = node;
myNodeId = myNode.getHostId();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ public List<OnDemandRepairJobView> scheduleClusterWideJob(final TableReference t
{
UUID randomAvailableNodeId = selectRandomAvailableNode();
OnDemandRepairJobView jobView = scheduleJob(tableReference, true, repairType, randomAvailableNodeId);
return getAllClusterWideRepairJobs().stream()
.filter(j -> j.getId().equals(jobView.getId()))
List<OnDemandRepairJobView> jobViews = getAllClusterWideRepairJobs().stream()
.filter(j -> j.getNodeId().equals(jobView.getNodeId()))
.collect(Collectors.toList());
return jobViews;
}

private UUID selectRandomAvailableNode() throws EcChronosException
Expand Down Expand Up @@ -208,7 +209,7 @@ private OnDemandRepairJobView scheduleJob(final TableReference tableReference,
{
validateTableReference(tableReference);
OnDemandRepairJob job = getRepairJob(tableReference, isClusterWide, repairType, nodeId);
myScheduledJobs.put(job.getId(), job);
myScheduledJobs.put(job.getJobId(), job);
myScheduleManager.schedule(nodeId, job);
return job.getView();
}
Expand All @@ -233,9 +234,9 @@ private void scheduleOngoingJob(final OngoingJob ongoingJob)
synchronized (myLock)
{
OnDemandRepairJob job = getOngoingRepairJob(ongoingJob);
if (myScheduledJobs.putIfAbsent(job.getId(), job) == null)
if (myScheduledJobs.putIfAbsent(job.getJobId(), job) == null)
{
LOG.info("Scheduling ongoing job: {}", job.getId());
LOG.info("Scheduling ongoing job: {}", job.getJobId());
myScheduleManager.schedule(ongoingJob.getHostId(), job);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -177,6 +178,26 @@ public List<ScheduledRepairJobView> getCurrentRepairJobs()
}
}

@Override
public List<ScheduledRepairJobView> getCurrentRepairJobsByNode(final UUID nodeId)
{
synchronized (myLock)
{
Map<TableReference, Set<ScheduledRepairJob>> tableJobs = myScheduledJobs.get(nodeId);

if (tableJobs == null)
{
return Collections.emptyList();
}

return tableJobs.values().stream()
.flatMap(Set::stream)
.map(ScheduledRepairJob::getView)
.collect(Collectors.toList());
}
}


private void handleTableConfigurationChange(
final Node node,
final TableReference tableReference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public String getCurrentJobStatus()
ScheduledJob job = currentExecutingJob.get();
if (job != null)
{
String jobId = job.getId().toString();
String jobId = job.getJobId().toString();
return "Job ID: " + jobId + ", Status: Running";
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class TableRepairJob extends ScheduledRepairJob

TableRepairJob(final Builder builder)
{
super(builder.configuration, builder.tableReference.getId(), builder.tableReference, builder.jmxProxyFactory,
super(builder.configuration, builder.tableReference.getId(), builder.myNode.getHostId(), builder.tableReference,
builder.jmxProxyFactory,
builder.repairConfiguration, builder.repairPolicies,
builder.tableRepairMetrics, builder.repairLockType);
myNode = Preconditions.checkNotNull(builder.myNode,
Expand All @@ -85,7 +86,7 @@ public class TableRepairJob extends ScheduledRepairJob
public ScheduledRepairJobView getView()
{
long now = System.currentTimeMillis();
return new ScheduledRepairJobView(getId(), getTableReference(), getRepairConfiguration(),
return new ScheduledRepairJobView(getNodeId(), getJobId(), getTableReference(), getRepairConfiguration(),
myRepairState.getSnapshot(),
getStatus(now), getProgress(now), getNextRunInMs(), getRepairConfiguration().getRepairType());
}
Expand Down Expand Up @@ -167,7 +168,7 @@ public Iterator<ScheduledTask> iterator()
.withRepairHistory(myRepairHistory)
.withRepairResourceFactory(getRepairLockType().getLockFactory())
.withRepairLockFactory(REPAIR_LOCK_FACTORY)
.withJobId(getId())
.withJobId(getJobId())
.withNode(myNode);

taskList.add(builder.build(getRealPriority(replicaRepairGroup.getLastCompletedAt())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ScheduledRepairJobView build()
{
this.repairConfiguration = generateRepairConfiguration(repairInterval);
}
return new ScheduledRepairJobView(id, tableReference(keyspace, table), repairConfiguration,
return new ScheduledRepairJobView(id, id, tableReference(keyspace, table), repairConfiguration,
generateRepairStateSnapshot(lastRepairedAt, vnodeRepairStates), status,progress, lastRepairedAt + repairInterval, repairType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ public void finalVerification()
public void testCurrentJobCorrectlyReturned()
{
IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob();
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference,
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getJobId(), myHostId, myTableReference,
OnDemandRepairJobView.Status.IN_QUEUE, 0, System.currentTimeMillis(), RepairType.INCREMENTAL);
assertThat(repairJob.getId()).isEqualTo(repairJob.getId());
assertThat(repairJob.getJobId()).isEqualTo(repairJob.getJobId());
assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1);
assertThat(repairJob.getTableReference()).isEqualTo(myTableReference);
assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference());
assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus());
assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId());
assertThat(repairJob.getView().getNodeId()).isEqualTo(expectedView.getNodeId());
assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType());
}

Expand All @@ -108,13 +108,13 @@ public void testCurrentFailedJobCorrectlyReturned()
IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob();
Iterator<ScheduledTask> it = repairJob.iterator();
repairJob.postExecute(false, it.next());
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference,
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getJobId(), myHostId, myTableReference,
OnDemandRepairJobView.Status.ERROR, 0, System.currentTimeMillis(), RepairType.INCREMENTAL);
assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1);
assertThat(repairJob.getTableReference()).isEqualTo(myTableReference);
assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference());
assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus());
assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId());
assertThat(repairJob.getView().getNodeId()).isEqualTo(expectedView.getNodeId());
assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType());
}

Expand All @@ -123,13 +123,13 @@ public void testFailedJobCorrectlyReturned()
{
IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob();
when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.failed);
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference,
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getJobId(), myHostId, myTableReference,
OnDemandRepairJobView.Status.ERROR, 0, System.currentTimeMillis(), RepairType.INCREMENTAL);
assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1);
assertThat(repairJob.getTableReference()).isEqualTo(myTableReference);
assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference());
assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus());
assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId());
assertThat(repairJob.getView().getNodeId()).isEqualTo(expectedView.getNodeId());
assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType());
}

Expand All @@ -138,14 +138,14 @@ public void testFinishedJobCorrectlyReturned()
{
IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob();
when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.finished);
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference,
OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getJobId(), myHostId, myTableReference,
OnDemandRepairJobView.Status.COMPLETED, 0, System.currentTimeMillis(), RepairType.INCREMENTAL);
assertThat(repairJob.getId()).isEqualTo(repairJob.getId());
assertThat(repairJob.getJobId()).isEqualTo(repairJob.getJobId());
assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1);
assertThat(repairJob.getTableReference()).isEqualTo(myTableReference);
assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference());
assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus());
assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId());
assertThat(repairJob.getView().getNodeId()).isEqualTo(expectedView.getNodeId());
assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ public class DummyJob extends ScheduledJob
{
volatile boolean hasRun = false;

public DummyJob(Priority priority)
public DummyJob(Priority priority, UUID nodeID)
{
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build(), nodeID);
}

public DummyJob(Priority priority, UUID jobId)
public DummyJob(Priority priority, UUID jobId, UUID nodeID)
{
super(new ConfigurationBuilder().withPriority(priority).build(), jobId);
super(new ConfigurationBuilder().withPriority(priority).build(), jobId, nodeID);
}

public boolean hasRun()
Expand Down
Loading

0 comments on commit 738cf0d

Please sign in to comment.