diff --git a/extensions/spring-xd-extension-gpfdist/src/main/java/org/springframework/xd/greenplum/gpfdist/GPFDistMessageHandler.java b/extensions/spring-xd-extension-gpfdist/src/main/java/org/springframework/xd/greenplum/gpfdist/GPFDistMessageHandler.java index b55610b81..f80f3787a 100644 --- a/extensions/spring-xd-extension-gpfdist/src/main/java/org/springframework/xd/greenplum/gpfdist/GPFDistMessageHandler.java +++ b/extensions/spring-xd-extension-gpfdist/src/main/java/org/springframework/xd/greenplum/gpfdist/GPFDistMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.xd.greenplum.gpfdist; import java.util.Date; @@ -21,7 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Processor; + import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; import org.springframework.scheduling.TaskScheduler; @@ -31,12 +32,12 @@ import org.springframework.xd.greenplum.support.NetworkUtils; import org.springframework.xd.greenplum.support.RuntimeContext; +import com.codahale.metrics.Meter; + import reactor.Environment; import reactor.core.processor.RingBufferProcessor; import reactor.io.buffer.Buffer; -import com.codahale.metrics.Meter; - public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler { private final Log log = LogFactory.getLog(GPFDistMessageHandler.class); @@ -57,7 +58,7 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler { private GreenplumLoad greenplumLoad; - private Processor processor; + private RingBufferProcessor processor; private GPFDistServer gpfdistServer; @@ -66,7 +67,9 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler { private final TaskFuture taskFuture = new TaskFuture(); private int rateInterval = 0; - private Meter meter = null; + + private Meter meter = null; + private int meterCount = 0; public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchTimeout, int batchCount, @@ -85,19 +88,22 @@ public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchT protected void doWrite(Message message) throws Exception { Object payload = message.getPayload(); if (payload instanceof String) { - String data = (String)payload; + String data = (String) payload; if (delimiter != null) { - processor.onNext(Buffer.wrap(data+delimiter)); - } else { + processor.onNext(Buffer.wrap(data + delimiter)); + } + else { processor.onNext(Buffer.wrap(data)); } if (meter != null) { if ((meterCount++ % rateInterval) == 0) { meter.mark(rateInterval); - log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + meter.getMeanRate()); + log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + + meter.getMeanRate()); } } - } else { + } + else { throw new MessageHandlingException(message, "message not a String"); } } @@ -116,7 +122,8 @@ protected void doStart() { gpfdistServer = new GPFDistServer(processor, port, flushCount, flushTime, batchTimeout, batchCount); gpfdistServer.start(); log.info("gpfdist protocol listener running on port=" + gpfdistServer.getLocalPort()); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException("Error starting protocol listener", e); } @@ -127,49 +134,93 @@ protected void doStart() { context.addLocation(NetworkUtils.getGPFDistUri(gpfdistServer.getLocalPort())); sqlTaskScheduler.schedule((new FutureTask(new Runnable() { + @Override public void run() { boolean taskValue = true; try { - while(!taskFuture.interrupted) { + while (!taskFuture.interrupted) { try { greenplumLoad.load(context); - } catch (Exception e) { + } + catch (Exception e) { log.error("Error in load", e); } - Thread.sleep(batchPeriod*1000); + Thread.sleep(batchPeriod * 1000); } - } catch (Exception e) { + } + catch (Exception e) { taskValue = false; } taskFuture.set(taskValue); } }, null)), new Date()); - } else { + } + else { log.info("Skipping gpload tasks because greenplumLoad is not set"); } } @Override protected void doStop() { + boolean drained = false; if (greenplumLoad != null) { + + // xd waits 30s to shutdown module, so lets wait 25 to drain + long waitDrain = System.currentTimeMillis() + 25000l; + + log.info("Trying to wait buffer to get drained"); + while (System.currentTimeMillis() < waitDrain) { + long capacity = processor.getCapacity(); + long availableCapacity = processor.getAvailableCapacity(); + log.info("Buffer capacity " + capacity); + log.info("Buffer available capacity " + availableCapacity); + if (capacity != availableCapacity) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + } + } + else { + log.info("Marking stream drained"); + drained = true; + break; + } + } + + // try to wait current load operation to finish. taskFuture.interruptTask(); try { long now = System.currentTimeMillis(); // wait a bit more than batch period + log.info("Cancelling loading task"); Boolean value = taskFuture.get(batchTimeout + batchPeriod + 2, TimeUnit.SECONDS); log.info("Stopping, got future value " + value + " from task which took " + (System.currentTimeMillis() - now) + "ms"); - } catch (Exception e) { + } + catch (Exception e) { log.warn("Got error from task wait value which may indicate trouble", e); } } try { - processor.onComplete(); + if (drained) { + log.info("Sending onComplete to processor"); + processor.onComplete(); + } + else { + // if it looks like we didn't drain, + // force shutdown as onComplete will + // block otherwise. + log.info("Forcing processor shutdown"); + processor.forceShutdown(); + } + log.info("Shutting down protocol listener"); gpfdistServer.stop(); - } catch (Exception e) { + } + catch (Exception e) { log.warn("Error shutting down protocol listener", e); } }