Skip to content

Commit

Permalink
[CDAP-21096] Split ProgramLifecycleService for Appfabric service and …
Browse files Browse the repository at this point in the history
…processor

Replace inheritance with composition
  • Loading branch information
vsethi09 committed Jan 30, 2025
1 parent b85f031 commit ee0968a
Show file tree
Hide file tree
Showing 24 changed files with 1,445 additions and 1,207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.cdap.cdap.gateway.handlers.ProfileHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.ProgramLifecycleHttpHandlerInternal;
import io.cdap.cdap.gateway.handlers.ProgramRuntimeHttpHandler;
import io.cdap.cdap.gateway.handlers.ProvisionerHttpHandler;
import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler;
import io.cdap.cdap.gateway.handlers.TransactionHttpHandler;
Expand Down Expand Up @@ -114,11 +115,11 @@
import io.cdap.cdap.internal.app.runtime.schedule.store.TriggerMisfireLogger;
import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowStateWriter;
import io.cdap.cdap.internal.app.runtime.workflow.WorkflowStateWriter;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.LocalRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.ProgramLifecycleService;
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.FlowControlService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
Expand Down Expand Up @@ -249,17 +250,11 @@ protected void configure() {
Names.named("appfabric.handler.hooks"));
handlerHookNamesBinder.addBinding().toInstance(Constants.Service.APP_FABRIC_HTTP);

// TODO (CDAP-21112): Remove the addtional handler binding for in-memory and use the binding from
// AppFabricServiceModule, after fixing in-memory cache issue in ProgramRuntimeService and
// RunRecordMonitorService.
// For ProgramLifecycleHttpHandlerTest the ProgramRuntimeHttpHandler needs to be present
// in the appfabric service.
Multibinder<HttpHandler> handlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));
handlerBinder.addBinding().to(BootstrapHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);
handlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);

// TODO: Uncomment after CDAP-7688 is resolved
// servicesNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);
Expand Down Expand Up @@ -529,6 +524,11 @@ protected void configure() {
handlerBinder.addBinding().to(CredentialProviderHttpHandler.class);
handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class);
handlerBinder.addBinding().to(OperationHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
handlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
Expand All @@ -545,14 +545,8 @@ protected void configure() {
Multibinder<HttpHandler> processorHandlerBinder = Multibinder.newSetBinder(
binder(), HttpHandler.class, Names.named(AppFabric.PROCESSOR_HANDLERS_BINDING));
CommonHandlers.add(processorHandlerBinder);
// TODO (CDAP-21112): Move HTTP handler from Appfabric processor to server after fixing
// ProgramRuntimeService and RunRecordMonitorService.
processorHandlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);
processorHandlerBinder.addBinding().to(BootstrapHttpHandler.class);
processorHandlerBinder.addBinding().to(AppLifecycleHttpHandler.class);
processorHandlerBinder.addBinding().to(AppLifecycleHttpHandlerInternal.class);
processorHandlerBinder.addBinding().to(ProgramLifecycleHttpHandler.class);
processorHandlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
processorHandlerBinder.addBinding().to(WorkflowHttpHandler.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.io.Closeables;
Expand All @@ -26,12 +27,14 @@
import com.google.inject.Inject;
import io.cdap.cdap.app.deploy.ProgramRunDispatcherContext;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.twill.TwillAppNames;
import io.cdap.cdap.internal.app.deploy.ProgramRunDispatcherFactory;
import io.cdap.cdap.internal.app.runtime.AbstractListener;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import io.cdap.cdap.proto.InMemoryProgramLiveInfo;
import io.cdap.cdap.proto.NotRunningProgramLiveInfo;
Expand All @@ -46,6 +49,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -59,6 +64,7 @@
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,27 +127,38 @@ void setRemoteTwillRunnerService(
@Override
public final RuntimeInfo run(ProgramDescriptor programDescriptor, ProgramOptions options,
RunId runId) {
ProgramRunDispatcherContext dispatcherContext = new ProgramRunDispatcherContext(
programDescriptor, options, runId,
isDistributed());
ProgramId programId = programDescriptor.getProgramId();
ProgramRunId programRunId = programId.run(runId);
DelayedProgramController controller = new DelayedProgramController(programRunId);
RuntimeInfo runtimeInfo = createRuntimeInfo(controller, programId,
dispatcherContext::executeCleanupTasks);
updateRuntimeInfo(runtimeInfo);
executor.execute(() -> {
try {
controller.setProgramController(
programRunDispatcherFactory.getProgramRunDispatcher(programId.getType())
.dispatchProgram(dispatcherContext));
} catch (Exception e) {
controller.failed(e);
programStateWriter.error(programRunId, e);
LOG.error("Exception while trying to run program run {}", programRunId, e);
Lock lock = runtimeInfosLock.writeLock();
lock.lock();
try {
RuntimeInfo runtimeInfo = lookup(programRunId.getParent(), runId);
if (runtimeInfo != null) {
return runtimeInfo;
}
});
return runtimeInfo;

ProgramRunDispatcherContext dispatcherContext = new ProgramRunDispatcherContext(
programDescriptor, options, runId,
isDistributed());
DelayedProgramController controller = new DelayedProgramController(programRunId);
runtimeInfo = createRuntimeInfo(controller, programId,
dispatcherContext::executeCleanupTasks);
updateRuntimeInfo(runtimeInfo);
executor.execute(() -> {
try {
controller.setProgramController(
programRunDispatcherFactory.getProgramRunDispatcher(programId.getType())
.dispatchProgram(dispatcherContext));
} catch (Exception e) {
controller.failed(e);
programStateWriter.error(programRunId, e);
LOG.error("Exception while trying to run program run {}", programRunId, e);
}
});
return runtimeInfo;
} finally {
lock.unlock();
}
}

@Override
Expand Down Expand Up @@ -239,6 +256,28 @@ public List<RuntimeInfo> listAll(ProgramType... types) {
return runningPrograms;
}

@Override
public void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Resetting log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
resetLogLevels(programId, loggerNames, runId);
}

@Override
public void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
if (!EnumSet.of(ProgramType.SERVICE, ProgramType.WORKER).contains(programId.getType())) {
throw new BadRequestException(
String.format("Updating log levels for program type %s is not supported",
programId.getType().getPrettyName()));
}
updateLogLevels(programId, logLevels, runId);
}

@Override
protected void startUp() throws Exception {
// Limits to at max poolSize number of concurrent program launch.
Expand Down Expand Up @@ -436,4 +475,79 @@ private void cleanupRuntimeInfo(@Nullable RuntimeInfo info) {
Closeables.closeQuietly((Closeable) info);
}
}

/**
* Helper method to get the {@link LogLevelUpdater} for the program.
*/
private LogLevelUpdater getLogLevelUpdater(RuntimeInfo runtimeInfo) throws Exception {
ProgramController programController = runtimeInfo.getController();
if (!(programController instanceof LogLevelUpdater)) {
throw new BadRequestException(
"Update log levels at runtime is only supported in distributed mode");
}
return ((LogLevelUpdater) programController);
}

/**
* Helper method to update log levels for Worker or Service.
*/
private void updateLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels,
@Nullable String runId) throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.updateLogLevels(logLevels, null);
}
}

/**
* Helper method to reset log levels for Worker or Service.
*/
private void resetLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId)
throws Exception {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId, runId).values()
.stream()
.findFirst().orElse(null);
if (runtimeInfo != null) {
LogLevelUpdater logLevelUpdater = getLogLevelUpdater(runtimeInfo);
logLevelUpdater.resetLogLevels(loggerNames, null);
}
}

@Override
public void setInstances(ProgramId programId, int instances, int oldInstances)
throws ExecutionException, InterruptedException, BadRequestException {
ProgramRuntimeService.RuntimeInfo runtimeInfo = findRuntimeInfo(programId);
if (runtimeInfo != null) {
runtimeInfo.getController().command(ProgramOptionConstants.INSTANCES,
ImmutableMap.of("runnable", programId.getProgram(),
"newInstances", String.valueOf(instances),
"oldInstances", String.valueOf(oldInstances))).get();
}
}

private Map<RunId, ProgramRuntimeService.RuntimeInfo> findRuntimeInfo(
ProgramId programId, @Nullable String runId) throws BadRequestException {

if (runId != null) {
RunId run;
try {
run = RunIds.fromString(runId);
} catch (IllegalArgumentException e) {
throw new BadRequestException("Error parsing run-id.", e);
}
ProgramRuntimeService.RuntimeInfo runtimeInfo = lookup(programId, run);
return runtimeInfo == null ? Collections.emptyMap()
: Collections.singletonMap(run, runtimeInfo);
}
return new HashMap<>(list(programId));
}

@Nullable
protected ProgramRuntimeService.RuntimeInfo findRuntimeInfo(ProgramId programId)
throws BadRequestException {
return findRuntimeInfo(programId, null).values().stream().findFirst().orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import com.google.common.util.concurrent.Service;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.proto.ProgramLiveInfo;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.api.logging.LogEntry;

/**
* Service for interacting with the runtime system.
Expand All @@ -47,7 +53,9 @@ interface RuntimeInfo {
}

/**
* Starts the given program and return a {@link RuntimeInfo} about the running program.
* Runs the given program and return a {@link RuntimeInfo} about the running program. The program
* is run if it is not already running, otherwise the {@link RuntimeInfo} of the already running
* program is returned.
*
* @param programDescriptor describing the program to run
* @param options {@link ProgramOptions} that are needed by the program.
Expand Down Expand Up @@ -97,4 +105,60 @@ interface RuntimeInfo {
* @param types Types of program to check returns List of info about running programs.
*/
List<RuntimeInfo> listAll(ProgramType... types);

/**
* Reset log levels for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* reset.
* @param loggerNames the {@link String} set of the logger names to be updated, empty means
* reset for all loggers.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously resetting log
* levels.
* @throws ExecutionException if there is an error while asynchronously resetting log levels.
* @throws UnauthorizedException if the user does not have privileges to reset log levels for
* the specified program. To reset log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void resetProgramLogLevels(ProgramId programId, Set<String> loggerNames, @Nullable String runId) throws Exception;

/**
* Update log levels for the given program. Only supported program types for this action are
* {@link ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which log levels are to be
* updated
* @param logLevels the {@link Map} of the log levels to be updated.
* @param runId the run id of the program.
* @throws InterruptedException if there is an error while asynchronously updating log
* levels.
* @throws ExecutionException if there is an error while asynchronously updating log levels.
* @throws BadRequestException if the log level is not valid or the program type is not
* supported.
* @throws UnauthorizedException if the user does not have privileges to update log levels for
* the specified program. To update log levels for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void updateProgramLogLevels(ProgramId programId, Map<String, LogEntry.Level> logLevels, @Nullable String runId)
throws Exception;

/**
* Set instances for the given program. Only supported program types for this action are {@link
* ProgramType#SERVICE} and {@link ProgramType#WORKER}.
*
* @param programId the {@link ProgramId} of the program for which instances are to be
* updated
* @param instances the number of instances to be updated.
* @param instances the previous number of instances.
*
* @throws InterruptedException if there is an error while asynchronously updating instances
* @throws ExecutionException if there is an error while asynchronously updating instances
* @throws BadRequestException if the number of instances specified is less than 0
* @throws UnauthorizedException if the user does not have privileges to set instances for the
* specified program. To set instances for a program, a user needs {@link
* StandardPermission#UPDATE} on the program.
*/
void setInstances(ProgramId programId, int instances, int oldInstances) throws Exception;
}
11 changes: 11 additions & 0 deletions cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.app.program.Program;
import io.cdap.cdap.app.program.ProgramDescriptor;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ConflictException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.ProgramNotFoundException;
Expand Down Expand Up @@ -464,6 +465,16 @@ void updateApplicationSourceControlMeta(Map<ApplicationId, SourceControlMeta> up
@Nullable
ApplicationMeta getLatest(ApplicationReference appRef);

/**
* Gets the ApplicationId with the latest version for the given ApplicationReference.
*
* @param appRef ApplicationReference
* @return ApplicationId for the latest version
*
* @throws ApplicationNotFoundException if the app was not found for the given application reference.
*/
ApplicationId getLatestApp(ApplicationReference appRef) throws ApplicationNotFoundException;

/**
* Scans for the latest applications across all namespaces.
*
Expand Down
Loading

0 comments on commit ee0968a

Please sign in to comment.