diff --git a/src/main/java/io/pivotal/spring/xd/jdbcgpfdist/gpfdist/GPFDistMessageHandler.java b/src/main/java/io/pivotal/spring/xd/jdbcgpfdist/gpfdist/GPFDistMessageHandler.java index 779dccc..710ea8d 100644 --- a/src/main/java/io/pivotal/spring/xd/jdbcgpfdist/gpfdist/GPFDistMessageHandler.java +++ b/src/main/java/io/pivotal/spring/xd/jdbcgpfdist/gpfdist/GPFDistMessageHandler.java @@ -15,13 +15,9 @@ */ package io.pivotal.spring.xd.jdbcgpfdist.gpfdist; -/** - * Created by cq on 27/3/16. - */ import com.codahale.metrics.Meter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Processor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; import org.springframework.scheduling.TaskScheduler; @@ -58,7 +54,7 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler { private GreenplumLoad greenplumLoad; - private Processor processor; + private RingBufferProcessor processor; private GPFDistServer gpfdistServer; @@ -67,7 +63,9 @@ public class GPFDistMessageHandler extends AbstractGPFDistMessageHandler { private final TaskFuture taskFuture = new TaskFuture(); private int rateInterval = 0; - private Meter meter = null; + + private Meter meter = null; + private int meterCount = 0; public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchTimeout, int batchCount, @@ -85,22 +83,23 @@ public GPFDistMessageHandler(int port, int flushCount, int flushTime, int batchT @Override protected void doWrite(Message message) throws Exception { Object payload = message.getPayload(); - if (payload instanceof String) { - String data = (String)payload; - log.info("data:" + data); + String data = (String) payload; if (delimiter != null) { - processor.onNext(Buffer.wrap(data+delimiter)); - } else { + processor.onNext(Buffer.wrap(data + delimiter)); + } + else { processor.onNext(Buffer.wrap(data)); } if (meter != null) { if ((meterCount++ % rateInterval) == 0) { meter.mark(rateInterval); - log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + meter.getMeanRate()); + log.info("METER: 1 minute rate = " + meter.getOneMinuteRate() + " mean rate = " + + meter.getMeanRate()); } } - } else { + } + else { throw new MessageHandlingException(message, "message not a String"); } } @@ -110,7 +109,6 @@ protected void onInit() throws Exception { super.onInit(); Environment.initializeIfEmpty().assignErrorJournal(); processor = RingBufferProcessor.create(false); - log.info("onInit called!!"); } @Override @@ -120,7 +118,8 @@ protected void doStart() { gpfdistServer = new GPFDistServer(processor, port, flushCount, flushTime, batchTimeout, batchCount); gpfdistServer.start(); log.info("gpfdist protocol listener running on port=" + gpfdistServer.getLocalPort()); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException("Error starting protocol listener", e); } @@ -130,47 +129,94 @@ protected void doStart() { final RuntimeContext context = new RuntimeContext(); context.addLocation(NetworkUtils.getGPFDistUri(gpfdistServer.getLocalPort())); - sqlTaskScheduler.schedule((new FutureTask(() -> { - boolean taskValue = true; - try { - while(!taskFuture.interrupted) { - try { - greenplumLoad.load(context); - } catch (Exception e) { - log.error("Error in load", e); + sqlTaskScheduler.schedule((new FutureTask(new Runnable() { + + @Override + public void run() { + boolean taskValue = true; + try { + while (!taskFuture.interrupted) { + try { + greenplumLoad.load(context); + } + catch (Exception e) { + log.error("Error in load", e); + } + Thread.sleep(batchPeriod * 1000); } - Thread.sleep(batchPeriod*1000); } - } catch (Exception e) { - taskValue = false; + catch (Exception e) { + taskValue = false; + } + taskFuture.set(taskValue); } - taskFuture.set(taskValue); }, null)), new Date()); - } else { + } + else { log.info("Skipping gpload tasks because greenplumLoad is not set"); } } @Override protected void doStop() { + boolean drained = false; if (greenplumLoad != null) { + + // xd waits 30s to shutdown module, so lets wait 25 to drain + long waitDrain = System.currentTimeMillis() + 25000l; + + log.info("Trying to wait buffer to get drained"); + while (System.currentTimeMillis() < waitDrain) { + long capacity = processor.getCapacity(); + long availableCapacity = processor.getAvailableCapacity(); + log.info("Buffer capacity " + capacity); + log.info("Buffer available capacity " + availableCapacity); + if (capacity != availableCapacity) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + } + } + else { + log.info("Marking stream drained"); + drained = true; + break; + } + } + + // try to wait current load operation to finish. taskFuture.interruptTask(); try { long now = System.currentTimeMillis(); // wait a bit more than batch period + log.info("Cancelling loading task"); Boolean value = taskFuture.get(batchTimeout + batchPeriod + 2, TimeUnit.SECONDS); log.info("Stopping, got future value " + value + " from task which took " + (System.currentTimeMillis() - now) + "ms"); - } catch (Exception e) { + } + catch (Exception e) { log.warn("Got error from task wait value which may indicate trouble", e); } } try { - processor.onComplete(); + if (drained) { + log.info("Sending onComplete to processor"); + processor.onComplete(); + } + else { + // if it looks like we didn't drain, + // force shutdown as onComplete will + // block otherwise. + log.info("Forcing processor shutdown"); + processor.forceShutdown(); + } + log.info("Shutting down protocol listener"); gpfdistServer.stop(); - } catch (Exception e) { + } + catch (Exception e) { log.warn("Error shutting down protocol listener", e); } }