Skip to content

Commit

Permalink
Merge pull request #15834 from cdapio/sidhdirenge-taskworker-fix
Browse files Browse the repository at this point in the history
[CDAP-21118] Task workers & Preview runners should not communicate with Spanner Messaging Service directly
  • Loading branch information
sidhdirenge authored Jan 31, 2025
2 parents 8143416 + ae37ee1 commit 82c4490
Show file tree
Hide file tree
Showing 28 changed files with 332 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.cdap.cdap.app.DefaultAppConfigurer;
import io.cdap.cdap.app.DefaultApplicationContext;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
Expand Down Expand Up @@ -56,7 +55,7 @@
import io.cdap.cdap.internal.app.runtime.SimpleProgramOptions;
import io.cdap.cdap.internal.app.runtime.SystemArguments;
import io.cdap.cdap.logging.guice.LocalLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.guice.OperationalStatsModule;
Expand All @@ -70,7 +69,6 @@
import io.cdap.cdap.security.guice.SecureStoreServerModule;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.Configs;
Expand Down Expand Up @@ -284,7 +282,7 @@ private static ProgramRunnerFactory createProgramRunnerFactory(CConfiguration cC
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
new AuthenticationContextModules().getNoOpModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.encryption.guice.DataStorageAeadEncryptionModule;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
Expand Down Expand Up @@ -150,6 +151,9 @@
import io.cdap.cdap.internal.tethering.TetheringClientHandler;
import io.cdap.cdap.internal.tethering.TetheringHandler;
import io.cdap.cdap.internal.tethering.TetheringServerHandler;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.cdap.metadata.LocalPreferencesFetcherInternal;
import io.cdap.cdap.metadata.PreferencesFetcher;
import io.cdap.cdap.pipeline.PipelineFactory;
Expand Down Expand Up @@ -532,6 +536,14 @@ protected void configure() {
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and task workers need to
// communicate with messaging service via AppFabric.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import io.cdap.cdap.logging.guice.TMSLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metadata.PreferencesFetcher;
Expand Down Expand Up @@ -327,7 +327,7 @@ private Module getMessagingModules() {
return new MessagingServiceModule(cConf);
}

return new MessagingClientModule();
return new DefaultMessagingClientModule();
}

/**
Expand Down Expand Up @@ -389,7 +389,7 @@ public ProgramStatePublisher get() {
internalAuthenticator);

return new MessagingProgramStatePublisher(cConf,
new ClientMessagingService(cConf, remoteClientFactory));
new DefaultClientMessagingService(cConf, remoteClientFactory));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.data.runtime.DataSetsModules;
import io.cdap.cdap.data2.datafabric.dataset.RemoteDatasetFramework;
import io.cdap.cdap.data2.dataset2.DatasetDefinitionRegistryFactory;
Expand All @@ -37,6 +39,9 @@
import io.cdap.cdap.internal.app.preview.PreviewDataCleanupService;
import io.cdap.cdap.internal.app.preview.PreviewRunStopper;
import io.cdap.cdap.internal.app.store.preview.DefaultPreviewStore;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.http.HttpHandler;

/**
Expand All @@ -46,7 +51,10 @@ public class PreviewManagerModule extends PrivateModule {

private final boolean distributedRunner;

public PreviewManagerModule(boolean distributedRunner) {
private final CConfiguration cConf;

public PreviewManagerModule(CConfiguration cConf, boolean distributedRunner) {
this.cConf = cConf;
this.distributedRunner = distributedRunner;
}

Expand Down Expand Up @@ -76,6 +84,15 @@ protected void configure() {
handlerBinder.addBinding().to(PreviewHttpHandler.class);
handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class);
handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and preview runners need to
// communicate with messaging service via preview manager.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

CommonHandlers.add(handlerBinder);

bind(PreviewHttpServer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.twill.ExtendedTwillContext;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.PreviewRunnerMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
Expand Down Expand Up @@ -237,7 +237,7 @@ protected void configure() {
}

modules.add(new PreviewRunnerManagerModule().getDistributedModules());
modules.add(new MessagingServiceModule(cConf));
modules.add(new PreviewRunnerMessagingClientModule(cConf));
modules.add(new SecureStoreClientModule());
// Needed for InMemoryProgramRunnerModule. We use local metadata reader/publisher to avoid conflicting with
// metadata stored in AppFabric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.RemoteLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.TaskWorkerMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -95,7 +95,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf) {
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingServiceModule(cConf));
modules.add(new TaskWorkerMessagingClientModule(cConf));
modules.add(new SystemAppModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getDistributedModules());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import io.cdap.cdap.internal.metadata.MetadataConsumerSubscriberService;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metadata.MetadataService;
import io.cdap.cdap.metadata.MetadataServiceModule;
import io.cdap.cdap.metadata.MetadataSubscriberService;
Expand Down Expand Up @@ -111,7 +111,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, String
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new DFSLocationModule(),
new NamespaceQueryAdminModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.cdap.cdap.logging.guice.DistributedLogFrameworkModule;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.logging.service.LogSaverStatusService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
Expand Down Expand Up @@ -118,7 +118,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new AuditModule(),
new AuthorizationEnforcementModule().getDistributedModules(),
new AuthenticationContextModules().getMasterModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NoOpAuditLogModule(),
new AbstractModule() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.master.startup.ServiceResourceKeys;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.OperationalStatsService;
Expand Down Expand Up @@ -538,7 +538,7 @@ protected void configure() {
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
new AuditLogWriterModule(cConf).getDistributedModules(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsProcessorStatusServiceModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
Expand Down Expand Up @@ -121,7 +121,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, S
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new KafkaLogAppenderModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.LogQueryRuntimeModule;
import io.cdap.cdap.logging.guice.LogReaderRuntimeModules;
import io.cdap.cdap.logging.service.LogQueryService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsHandlerModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
Expand Down Expand Up @@ -110,7 +110,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
// For the injection of DatasetDefinition of MetricsTable directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -105,7 +105,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
new SystemDatasetRuntimeModule().getDistributedModules(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.schedule.Trigger;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
Expand Down Expand Up @@ -63,7 +62,7 @@
import io.cdap.cdap.internal.schedule.constraint.Constraint;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.data.MessageId;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand All @@ -74,7 +73,6 @@
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.cli.BasicParser;
Expand Down Expand Up @@ -361,7 +359,7 @@ private static Injector createInjector() throws Exception {
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new SecureStoreServerModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AbstractModule() {
@Override
protected void configure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.cdap.cdap.security.guice.SecureStoreClientModule;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -124,9 +123,9 @@ protected void configure() {
));

if (cConf.getInt(Constants.Preview.CONTAINER_COUNT) > 0) {
modules.add(new PreviewManagerModule(true));
modules.add(new PreviewManagerModule(cConf, true));
} else {
modules.add(new PreviewManagerModule(false));
modules.add(new PreviewManagerModule(cConf, false));
modules.add(new PreviewRunnerManagerModule().getStandaloneModules());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.cdap.cdap.logging.gateway.handlers.ProgramRunRecordFetcher;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
import java.util.Arrays;
Expand All @@ -66,7 +66,7 @@ protected List<Module> getServiceModules(MasterEnvironment masterEnv,
RemoteAuthenticatorModules.getDefaultModule(
TetheringAgentService.REMOTE_TETHERING_AUTHENTICATOR,
Constants.Tethering.CLIENT_AUTHENTICATOR_NAME),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NamespaceQueryAdminModule(),
getDataFabricModule(),
// Always use local table implementations, which use LevelDB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.messaging.DefaultMessageFetchRequest;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
Expand All @@ -50,7 +50,7 @@ public void testMessagingService() throws Exception {

// Use a separate TMS client to create topic, then publish and then poll some messages
TopicId topicId = NamespaceId.SYSTEM.topic("test");
MessagingService messagingService = new ClientMessagingService(remoteClientFactory, true);
MessagingService messagingService = new DefaultClientMessagingService(remoteClientFactory, true);
messagingService.createTopic(new DefaultTopicMetadata(topicId));

// Publish 10 messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -259,7 +259,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, Master
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingClientModule());
modules.add(new DefaultMessagingClientModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
//Need for guice binding, but No Audit Log action required.
modules.add(new NoOpAuditLogModule());
Expand Down
Loading

0 comments on commit 82c4490

Please sign in to comment.