From 6840512e50dd0fadb89b7ea28122f3ca1887a165 Mon Sep 17 00:00:00 2001 From: Andrey Dyachkov Date: Mon, 23 Dec 2019 16:01:13 +0100 Subject: [PATCH 1/3] instance termination plugin support --- .../zalando/nakadi/config/PluginsConfig.java | 15 +++++++++++++ .../plugin/DefaultTerminationService.java | 12 ++++++++++ .../DefaultTerminationServiceFactory.java | 10 +++++++++ .../subscription/StreamingContext.java | 22 +++++++++++++++++++ .../subscription/state/CleanupState.java | 2 +- .../subscription/state/StreamingState.java | 2 +- src/main/resources/application.yml | 2 ++ 7 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java create mode 100644 src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java diff --git a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java index d36bce176b..d53a1aedf1 100644 --- a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java +++ b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java @@ -54,4 +54,19 @@ public AuthorizationService authorizationService(@Value("${nakadi.plugins.authz. throw new BeanCreationException("Can't create AuthorizationService " + factoryName, e); } } + + @Bean + public TerminationService authorizationService(@Value("${nakadi.plugins.termination.factory}") final String factoryName, + final SystemProperties systemProperties, + final DefaultResourceLoader loader) { + try { + LOGGER.info("Initialize per-resource termination service factory: " + factoryName); + final Class factoryClass = + (Class) loader.getClassLoader().loadClass(factoryName); + final TerminationServiceFactory factory = factoryClass.newInstance(); + return factory.init(systemProperties); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new BeanCreationException("Can't create TerminationService " + factoryName, e); + } + } } diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java new file mode 100644 index 0000000000..b4916b0d2e --- /dev/null +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java @@ -0,0 +1,12 @@ +package org.zalando.nakadi.plugin; + +public class DefaultTerminationService implements TerminationService { + + public void register(final String listenerName, final TerminationListener terminationRunnable) { + // skip implementation for the local setup + } + + public void deregister(final String listenerName) { + // skip implementation for the local setup + } +} diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java new file mode 100644 index 0000000000..5218e02328 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationServiceFactory.java @@ -0,0 +1,10 @@ +package org.zalando.nakadi.plugin; + +import org.zalando.nakadi.plugin.api.SystemProperties; + +public class DefaultTerminationServiceFactory { + + public DefaultTerminationService init(final SystemProperties systemProperties) { + return new DefaultTerminationService(); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 57369b7444..93d197fea3 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; +import org.zalando.nakadi.plugin.api.exceptions.PluginException; import org.zalando.nakadi.service.AuthorizationValidator; import org.zalando.nakadi.service.BlacklistService; import org.zalando.nakadi.service.CursorConverter; @@ -68,6 +69,7 @@ public class StreamingContext implements SubscriptionStreamer { private final NakadiKpiPublisher kpiPublisher; private final Span currentSpan; private final String kpiDataStreamedEventType; + private final TerminationService terminationService; private final long kpiCollectionFrequencyMs; @@ -105,6 +107,7 @@ private StreamingContext(final Builder builder) { this.kpiCollectionFrequencyMs = builder.kpiCollectionFrequencyMs; this.streamMemoryLimitBytes = builder.streamMemoryLimitBytes; this.currentSpan = builder.currentSpan; + this.terminationService = builder.terminationService; } public Span getCurrentSpan() { @@ -163,10 +166,17 @@ public long getKpiCollectionFrequencyMs() { return kpiCollectionFrequencyMs; } + public TerminationService getTerminationService() { + return terminationService; + } + @Override public void stream() throws InterruptedException { try (Closeable ignore = ShutdownHooks.addHook(this::onNodeShutdown)) { // bugfix ARUHA-485 + terminationService.register(getSessionId(), this::onInstanceTermination); streamInternal(new StartingState()); + } catch (final PluginException pe) { + log.error("Failed to register instance termination callback for subscription {}", getSubscription(), pe); } catch (final IOException ex) { log.error( "Failed to delete shutdown hook for subscription {}. This method should not throw any exception", @@ -175,6 +185,11 @@ public void stream() throws InterruptedException { } } + void onInstanceTermination() { + log.info("Instance is about to be terminated. Trying to terminate subscription gracefully"); + switchState(new CleanupState(null)); + } + void onNodeShutdown() { log.info("Shutdown hook called. Trying to terminate subscription gracefully"); switchState(new CleanupState(null)); @@ -372,6 +387,7 @@ public static final class Builder { private long kpiCollectionFrequencyMs; private long streamMemoryLimitBytes; private Span currentSpan; + private TerminationService terminationService; public Builder setCurrentSpan(final Span span) { this.currentSpan = span; @@ -493,6 +509,12 @@ public Builder setKpiCollectionFrequencyMs(final long kpiCollectionFrequencyMs) return this; } + public Builder setTerminationService(final TerminationService terminationService) { + this.terminationService = terminationService; + return this; + } + + public StreamingContext build() { return new StreamingContext(this); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java index dab1a4ab14..8cd3beb559 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java @@ -29,8 +29,8 @@ public void onEnter() { } } finally { try { + getContext().getTerminationService().deregister(getSessionId()); getContext().unregisterSession(); - } finally { switchState(StreamingContext.DEAD_STATE); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index cb8d749170..c63701ad79 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -656,7 +656,7 @@ private void addToStreaming(final Partition partition, LoggerFactory.getLogger(LogPathBuilder.build( getContext().getSubscription().getId(), getSessionId(), String.valueOf(partition.getKey()))), System.currentTimeMillis(), this.getContext().getParameters().batchTimespan - ); + ); offsets.put(partition.getKey(), pd); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0fb0cafab2..936284b998 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -98,6 +98,8 @@ nakadi: factory: org.zalando.nakadi.plugin.auth.DefaultApplicationServiceFactory authz: factory: org.zalando.nakadi.plugin.auth.DefaultAuthorizationServiceFactory + termination: + factory: org.zalando.nakadi.plugin.DefaultTerminationServiceFactory event.max.bytes: 999000 timeline.wait.timeoutMs: 40000 subscription: From b476f7581ebadecbf63f0cd40fb0b19fed8ebbe3 Mon Sep 17 00:00:00 2001 From: Andrey Dyachkov Date: Mon, 23 Dec 2019 16:17:53 +0100 Subject: [PATCH 2/3] change health status if instance is about to be terminated --- .../nakadi/controller/HealthCheckController.java | 12 ++++++++++++ .../nakadi/plugin/DefaultTerminationService.java | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java b/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java index d181a904c9..181c699346 100644 --- a/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java +++ b/src/main/java/org/zalando/nakadi/controller/HealthCheckController.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.controller; +import org.eclipse.jetty.http.HttpStatus; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -12,8 +14,18 @@ @RequestMapping(value = "/health", produces = TEXT_PLAIN_VALUE) public class HealthCheckController { + private final TerminationService terminationService; + + @Autowired + public HealthCheckController(final TerminationService terminationService) { + this.terminationService = terminationService; + } + @RequestMapping(method = GET) public ResponseEntity healthCheck() { + if (terminationService.isTerminating()) { + return ResponseEntity.status(HttpStatus.IM_A_TEAPOT_418).build(); + } return ok().body("OK"); } } diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java index b4916b0d2e..e5d43aad11 100644 --- a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.plugin; +import org.zalando.nakadi.plugin.api.exceptions.PluginException; + public class DefaultTerminationService implements TerminationService { public void register(final String listenerName, final TerminationListener terminationRunnable) { @@ -9,4 +11,9 @@ public void register(final String listenerName, final TerminationListener termin public void deregister(final String listenerName) { // skip implementation for the local setup } + + boolean isTerminating() throws PluginException { + return false; + } + } From f79692aa2ddd31553876b78b454d64de1f8db385 Mon Sep 17 00:00:00 2001 From: Andrey Dyachkov Date: Mon, 23 Dec 2019 16:23:59 +0100 Subject: [PATCH 3/3] updated according plugin api --- .../java/org/zalando/nakadi/config/PluginsConfig.java | 2 +- .../zalando/nakadi/plugin/DefaultTerminationService.java | 6 +----- .../nakadi/service/subscription/StreamingContext.java | 8 +------- .../nakadi/service/subscription/state/CleanupState.java | 1 - 4 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java index d53a1aedf1..6ea5c7f020 100644 --- a/src/main/java/org/zalando/nakadi/config/PluginsConfig.java +++ b/src/main/java/org/zalando/nakadi/config/PluginsConfig.java @@ -56,7 +56,7 @@ public AuthorizationService authorizationService(@Value("${nakadi.plugins.authz. } @Bean - public TerminationService authorizationService(@Value("${nakadi.plugins.termination.factory}") final String factoryName, + public TerminationService terminationService(@Value("${nakadi.plugins.termination.factory}") final String factoryName, final SystemProperties systemProperties, final DefaultResourceLoader loader) { try { diff --git a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java index e5d43aad11..fe77984458 100644 --- a/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java +++ b/src/main/java/org/zalando/nakadi/plugin/DefaultTerminationService.java @@ -4,11 +4,7 @@ public class DefaultTerminationService implements TerminationService { - public void register(final String listenerName, final TerminationListener terminationRunnable) { - // skip implementation for the local setup - } - - public void deregister(final String listenerName) { + public void register(final TerminationListener terminationRunnable) { // skip implementation for the local setup } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 93d197fea3..1e3b1bea3f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -166,14 +166,10 @@ public long getKpiCollectionFrequencyMs() { return kpiCollectionFrequencyMs; } - public TerminationService getTerminationService() { - return terminationService; - } - @Override public void stream() throws InterruptedException { try (Closeable ignore = ShutdownHooks.addHook(this::onNodeShutdown)) { // bugfix ARUHA-485 - terminationService.register(getSessionId(), this::onInstanceTermination); + terminationService.register(this::onInstanceTermination); streamInternal(new StartingState()); } catch (final PluginException pe) { log.error("Failed to register instance termination callback for subscription {}", getSubscription(), pe); @@ -514,12 +510,10 @@ public Builder setTerminationService(final TerminationService terminationService return this; } - public StreamingContext build() { return new StreamingContext(this); } - } } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java index 8cd3beb559..bd43cf9c28 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java @@ -29,7 +29,6 @@ public void onEnter() { } } finally { try { - getContext().getTerminationService().deregister(getSessionId()); getContext().unregisterSession(); } finally { switchState(StreamingContext.DEAD_STATE);