Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][CDAP-21118] Task workers should communicate with Spanner Messaging Service only via App fabric #15843

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.cdap.cdap.app.DefaultAppConfigurer;
import io.cdap.cdap.app.DefaultApplicationContext;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
Expand Down Expand Up @@ -56,7 +55,7 @@
import io.cdap.cdap.internal.app.runtime.SimpleProgramOptions;
import io.cdap.cdap.internal.app.runtime.SystemArguments;
import io.cdap.cdap.logging.guice.LocalLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.guice.OperationalStatsModule;
Expand All @@ -70,7 +69,6 @@
import io.cdap.cdap.security.guice.SecureStoreServerModule;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.Configs;
Expand Down Expand Up @@ -284,7 +282,7 @@ private static ProgramRunnerFactory createProgramRunnerFactory(CConfiguration cC
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
new AuthenticationContextModules().getNoOpModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
Expand Down Expand Up @@ -149,6 +150,9 @@
import io.cdap.cdap.internal.tethering.TetheringClientHandler;
import io.cdap.cdap.internal.tethering.TetheringHandler;
import io.cdap.cdap.internal.tethering.TetheringServerHandler;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.cdap.metadata.LocalPreferencesFetcherInternal;
import io.cdap.cdap.metadata.PreferencesFetcher;
import io.cdap.cdap.pipeline.PipelineFactory;
Expand Down Expand Up @@ -532,6 +536,14 @@ protected void configure() {
handlerBinder.addBinding().to(ProgramLifecycleHttpHandlerInternal.class);
handlerBinder.addBinding().to(WorkflowHttpHandler.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and task workers need to
// communicate with messaging service via AppFabric.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import io.cdap.cdap.logging.guice.TMSLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.metadata.MetadataReaderWriterModules;
import io.cdap.cdap.metadata.PreferencesFetcher;
Expand Down Expand Up @@ -327,7 +327,7 @@ private Module getMessagingModules() {
return new MessagingServiceModule(cConf);
}

return new MessagingClientModule();
return new DefaultMessagingClientModule();
}

/**
Expand Down Expand Up @@ -389,7 +389,7 @@ public ProgramStatePublisher get() {
internalAuthenticator);

return new MessagingProgramStatePublisher(cConf,
new ClientMessagingService(cConf, remoteClientFactory));
new DefaultClientMessagingService(cConf, remoteClientFactory));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.MessagingSystem;
import io.cdap.cdap.data.runtime.DataSetsModules;
import io.cdap.cdap.data2.datafabric.dataset.RemoteDatasetFramework;
import io.cdap.cdap.data2.dataset2.DatasetDefinitionRegistryFactory;
Expand All @@ -37,6 +39,9 @@
import io.cdap.cdap.internal.app.preview.PreviewDataCleanupService;
import io.cdap.cdap.internal.app.preview.PreviewRunStopper;
import io.cdap.cdap.internal.app.store.preview.DefaultPreviewStore;
import io.cdap.cdap.messaging.server.FetchHandler;
import io.cdap.cdap.messaging.server.MetadataHandler;
import io.cdap.cdap.messaging.server.StoreHandler;
import io.cdap.http.HttpHandler;

/**
Expand All @@ -46,7 +51,10 @@ public class PreviewManagerModule extends PrivateModule {

private final boolean distributedRunner;

public PreviewManagerModule(boolean distributedRunner) {
private final CConfiguration cConf;

public PreviewManagerModule(CConfiguration cConf, boolean distributedRunner) {
this.cConf = cConf;
this.distributedRunner = distributedRunner;
}

Expand Down Expand Up @@ -76,6 +84,15 @@ protected void configure() {
handlerBinder.addBinding().to(PreviewHttpHandler.class);
handlerBinder.addBinding().to(PreviewErrorClassificationHttpHandler.class);
handlerBinder.addBinding().to(PreviewHttpHandlerInternal.class);

if (!cConf.getBoolean(MessagingSystem.MESSAGING_SERVICE_ENABLED)) {
// Add these handlers only if messaging service endpoint doesn't exist and preview runners need to
// communicate with messaging service via preview manager.
handlerBinder.addBinding().to(MetadataHandler.class);
handlerBinder.addBinding().to(StoreHandler.class);
handlerBinder.addBinding().to(FetchHandler.class);
}

CommonHandlers.add(handlerBinder);

bind(PreviewHttpServer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.twill.ExtendedTwillContext;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.PreviewRunnerMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
Expand Down Expand Up @@ -237,7 +237,7 @@ protected void configure() {
}

modules.add(new PreviewRunnerManagerModule().getDistributedModules());
modules.add(new MessagingServiceModule(cConf));
modules.add(new PreviewRunnerMessagingClientModule(cConf));
modules.add(new SecureStoreClientModule());
// Needed for InMemoryProgramRunnerModule. We use local metadata reader/publisher to avoid conflicting with
// metadata stored in AppFabric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.RemoteLogAppenderModule;
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.messaging.guice.MessagingServiceModule;
import io.cdap.cdap.messaging.guice.client.TaskWorkerMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -95,7 +95,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf) {
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingServiceModule(cConf));
modules.add(new TaskWorkerMessagingClientModule(cConf));
modules.add(new SystemAppModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
modules.add(new AuditLogWriterModule(cConf).getDistributedModules());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import io.cdap.cdap.internal.metadata.MetadataConsumerSubscriberService;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metadata.MetadataService;
import io.cdap.cdap.metadata.MetadataServiceModule;
import io.cdap.cdap.metadata.MetadataSubscriberService;
Expand Down Expand Up @@ -111,7 +111,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, String
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new DFSLocationModule(),
new NamespaceQueryAdminModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.cdap.cdap.logging.guice.DistributedLogFrameworkModule;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.logging.service.LogSaverStatusService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
Expand Down Expand Up @@ -118,7 +118,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new AuditModule(),
new AuthorizationEnforcementModule().getDistributedModules(),
new AuthenticationContextModules().getMasterModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NoOpAuditLogModule(),
new AbstractModule() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.master.startup.ServiceResourceKeys;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.operations.OperationalStatsService;
Expand Down Expand Up @@ -538,7 +538,7 @@ protected void configure() {
new DataSetsModules().getDistributedModules(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AuditModule(),
new AuditLogWriterModule(cConf).getDistributedModules(),
CoreSecurityRuntimeModule.getDistributedModule(cConf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsProcessorStatusServiceModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
Expand Down Expand Up @@ -121,7 +121,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf, S
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new MetricsClientRuntimeModule().getDistributedModules(),
new MetricsStoreModule(),
new KafkaLogAppenderModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.cdap.cdap.logging.guice.LogQueryRuntimeModule;
import io.cdap.cdap.logging.guice.LogReaderRuntimeModules;
import io.cdap.cdap.logging.service.LogQueryService;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsHandlerModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
Expand Down Expand Up @@ -110,7 +110,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
// For the injection of DatasetDefinition of MetricsTable directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.cdap.cdap.data2.audit.AuditModule;
import io.cdap.cdap.logging.appender.LogAppenderInitializer;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -105,7 +105,7 @@ static Injector createGuiceInjector(CConfiguration cConf, Configuration hConf,
new ZkClientModule(),
new ZkDiscoveryModule(),
new KafkaClientModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new DataFabricModules(txClientId).getDistributedModules(),
new DataSetsModules().getDistributedModules(),
new SystemDatasetRuntimeModule().getDistributedModules(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.schedule.Trigger;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule;
import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule.ServiceType;
import io.cdap.cdap.app.guice.AuthorizationModule;
import io.cdap.cdap.app.guice.ProgramRunnerRuntimeModule;
import io.cdap.cdap.app.guice.TwillModule;
Expand Down Expand Up @@ -63,7 +62,7 @@
import io.cdap.cdap.internal.schedule.constraint.Constraint;
import io.cdap.cdap.logging.guice.KafkaLogAppenderModule;
import io.cdap.cdap.messaging.data.MessageId;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand All @@ -74,7 +73,6 @@
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.cli.BasicParser;
Expand Down Expand Up @@ -361,7 +359,7 @@ private static Injector createInjector() throws Exception {
new AuthorizationModule(),
new AuthorizationEnforcementModule().getMasterModule(),
new SecureStoreServerModule(),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new AbstractModule() {
@Override
protected void configure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.cdap.cdap.security.guice.SecureStoreClientModule;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -124,9 +123,9 @@ protected void configure() {
));

if (cConf.getInt(Constants.Preview.CONTAINER_COUNT) > 0) {
modules.add(new PreviewManagerModule(true));
modules.add(new PreviewManagerModule(cConf, true));
} else {
modules.add(new PreviewManagerModule(false));
modules.add(new PreviewManagerModule(cConf, false));
modules.add(new PreviewRunnerManagerModule().getStandaloneModules());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.cdap.cdap.logging.gateway.handlers.ProgramRunRecordFetcher;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
import java.util.Arrays;
Expand All @@ -66,7 +66,7 @@ protected List<Module> getServiceModules(MasterEnvironment masterEnv,
RemoteAuthenticatorModules.getDefaultModule(
TetheringAgentService.REMOTE_TETHERING_AUTHENTICATOR,
Constants.Tethering.CLIENT_AUTHENTICATOR_NAME),
new MessagingClientModule(),
new DefaultMessagingClientModule(),
new NamespaceQueryAdminModule(),
getDataFabricModule(),
// Always use local table implementations, which use LevelDB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.messaging.DefaultMessageFetchRequest;
import io.cdap.cdap.messaging.DefaultTopicMetadata;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.client.ClientMessagingService;
import io.cdap.cdap.messaging.client.DefaultClientMessagingService;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.spi.RawMessage;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
Expand All @@ -50,7 +50,7 @@ public void testMessagingService() throws Exception {

// Use a separate TMS client to create topic, then publish and then poll some messages
TopicId topicId = NamespaceId.SYSTEM.topic("test");
MessagingService messagingService = new ClientMessagingService(remoteClientFactory, true);
MessagingService messagingService = new DefaultClientMessagingService(remoteClientFactory, true);
messagingService.createTopic(new DefaultTopicMetadata(topicId));

// Publish 10 messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import io.cdap.cdap.master.environment.MasterEnvironments;
import io.cdap.cdap.master.spi.environment.MasterEnvironment;
import io.cdap.cdap.master.spi.environment.MasterEnvironmentContext;
import io.cdap.cdap.messaging.guice.MessagingClientModule;
import io.cdap.cdap.messaging.guice.client.DefaultMessagingClientModule;
import io.cdap.cdap.metrics.guice.MetricsClientRuntimeModule;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.security.auth.context.AuthenticationContextModules;
Expand Down Expand Up @@ -259,7 +259,7 @@ static Injector createInjector(CConfiguration cConf, Configuration hConf, Master
modules.add(new IOModule());
modules.add(new AuthenticationContextModules().getMasterWorkerModule());
modules.add(coreSecurityModule);
modules.add(new MessagingClientModule());
modules.add(new DefaultMessagingClientModule());
modules.add(new MetricsClientRuntimeModule().getDistributedModules());
//Need for guice binding, but No Audit Log action required.
modules.add(new NoOpAuditLogModule());
Expand Down
Loading
Loading