diff --git a/src/de/caluga/morphium/messaging/Messaging.java b/src/de/caluga/morphium/messaging/Messaging.java index e84d7520a..c9072c745 100644 --- a/src/de/caluga/morphium/messaging/Messaging.java +++ b/src/de/caluga/morphium/messaging/Messaging.java @@ -238,15 +238,33 @@ public void run() { try { if (evt == null || evt.getOperationType() == null) return running; if (evt.getOperationType().equals("insert")) { +// if (log.isDebugEnabled()) +// log.debug(getSenderId() + ": incoming message: " + evt.getFullDocument().get("_id") + " inAnswerTo: " + evt.getFullDocument().get("in_answer_to")); //insert => new Message - // log.debug("New message incoming"); Msg obj = morphium.getMapper().deserialize(Msg.class, evt.getFullDocument()); + if (obj.getInAnswerTo() != null && waitingForMessages.containsKey(obj.getInAnswerTo())) { + if (log.isDebugEnabled()) + log.debug("processing answer " + obj.getMsgId() + " in answer to " + obj.getInAnswerTo()); + List lst = new ArrayList<>(); + lst.add(obj); + try { + processMessages(lst); + } catch (Exception e) { + log.error("Error during message processing ", e); + } + return running; + } + if (listenerByName.get(obj.getName()) == null && listeners.size() == 0) { + //ignoring incoming message, we do not have listener for + return running; + } if (obj.getSender().equals(id) || (obj.getProcessedBy() != null && obj.getProcessedBy().contains(id)) || (obj.getRecipient() != null && !obj.getRecipient().equals(id))) { //ignoring my own messages return running; } if (pauseMessages.containsKey(obj.getName())) { - log.debug("Not processing message - processing paused for " + obj.getName()); + if (log.isDebugEnabled()) + log.debug("Not processing message - processing paused for " + obj.getName()); return running; } //do not process messages, that are exclusive, but already processed or not for me / all @@ -282,8 +300,30 @@ public void run() { // if (((Map)data.get("o")).get("$set")!=null){ // //there is a set-update // } + if (evt.getFullDocument() != null && evt.getFullDocument().get("_id") != null) { Msg obj = morphium.findById(Msg.class, new MorphiumId(evt.getFullDocument().get("_id").toString()), getCollectionName()); + if (obj == null) { + return running; //was deleted? + } + if (obj.getInAnswerTo() != null && waitingForMessages.containsKey(obj.getInAnswerTo())) { + if (obj.isExclusive()) { + lockAndProcess(obj); + } else { + List lst = new ArrayList<>(); + lst.add(obj); + + try { + processMessages(lst); + } catch (Exception e) { + log.error("Error during message processing ", e); + } + } + } + if (listenerByName.get(obj.getName()) == null && listeners.size() == 0) { + if (obj.getInAnswerTo() == null || !waitingForMessages.containsKey(obj.getInAnswerTo())) + return running; + } if (obj != null && obj.isExclusive() && obj.getLockedBy() == null && !pauseMessages.containsKey(obj.getName()) && (obj.getRecipient() == null || obj.getRecipient().equals(id))) { log.debug("Update of msg - trying to lock"); // locking @@ -389,6 +429,13 @@ private MorphiumIterator findMessages(String name, boolean multiple) { or1.f(Msg.Fields.name).nin(pausedMessagesKeys); or2.f(Msg.Fields.name).nin(pausedMessagesKeys); } + if (listeners.isEmpty() && !listenerByName.isEmpty()) { + or1.f(Msg.Fields.name).in(listenerByName.keySet()); + or2.f(Msg.Fields.name).in(listenerByName.keySet()); + } else if (listenerByName.isEmpty() && listeners.isEmpty()) { + return q.q().f(Msg.Fields.msgId).eq("123445").asIterable(); + } + } ArrayList processingIds = new ArrayList<>(processing); if (!processing.isEmpty()) { @@ -458,7 +505,8 @@ private void lockAndProcess(Msg obj) { if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(id)) { List lst = new ArrayList<>(); lst.add(obj); - log.debug("locked messages: " + lst.size()); + if (log.isDebugEnabled()) + log.debug("locked messages: " + lst.size()); try { processMessages(lst); } catch (Exception e) { @@ -479,7 +527,8 @@ private void processMessages(Iterable messages) { //noinspection SuspiciousMethodCalls if (msg.getInAnswerTo() != null && waitingForMessages.get(msg.getInAnswerTo()) != null) { - log.debug("Got a message, we are waiting for..."); + if (log.isDebugEnabled()) + log.debug(getSenderId() + ": Got a message, we are waiting for..."); //this message we were waiting for waitingForAnswers.put((MorphiumId) msg.getInAnswerTo(), msg); processing.remove(m.getMsgId()); @@ -488,7 +537,7 @@ private void processMessages(Iterable messages) { } if (listeners.isEmpty() && listenerByName.isEmpty()) { updateProcessedByAndReleaseLock(msg); - log.debug("no listener registered... not processing message"); + log.error(getSenderId() + ": should not be here. not processing message, as no listeners are defined " + msg.getMsgId()); return; } // Query q = morphium.createQueryFor(m.getClass()).f("_id").eq(m.getMsgId()); @@ -522,7 +571,8 @@ private void processMessages(Iterable messages) { if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) { //Delete outdated msg! - log.debug("Found outdated message - deleting it!"); + if (log.isDebugEnabled()) + log.debug(getSenderId() + ": Found outdated message - deleting it!"); morphium.delete(msg, getCollectionName()); processing.remove(m.getMsgId()); return; @@ -535,7 +585,8 @@ private void processMessages(Iterable messages) { lst.addAll(listenerByName.get(msg.getName())); } if (lst.isEmpty()) { - log.debug("Message did not have a listener registered"); + if (log.isDebugEnabled()) + log.debug(getSenderId() + ": Message did not have a listener registered"); wasProcessed = true; } for (MessageListener l : lst) { @@ -547,6 +598,11 @@ private void processMessages(Iterable messages) { } if (answer != null) { msg.sendAnswer(Messaging.this, answer); + if (log.isDebugEnabled()) + log.debug("sent answer to " + answer.getInAnswerTo() + " recipient: " + answer.getRecipient() + " id: " + answer.getMsgId()); + if (answer.getRecipient() == null) { + log.error("Recipeient of answer is null?!?!"); + } } } catch (MessageRejectedException mre) { log.warn("Message was rejected by listener", mre); @@ -572,9 +628,11 @@ private void processMessages(Iterable messages) { } } } - if (!wasProcessed && !lst.isEmpty()) { + if (!wasProcessed && !wasRejected) { // msg.addAdditional("Processing of message failed by "+getSenderId()+": "+t.getMessage()); log.error("message was not processed"); + } else if (wasRejected) { + log.debug("Message rejected"); } // if (msg.getType().equals(MsgType.SINGLE)) { @@ -582,11 +640,11 @@ private void processMessages(Iterable messages) { // morphium.delete(msg, getCollectionName()); // } //updating it to be processed by others... - if ((msg.getLockedBy() != null && msg.getLockedBy().equals("ALL")) || (msg.getRecipient()!=null && msg.getRecipient().equals(id) && msg.getInAnswerTo() != null)) { + if ((msg.getLockedBy() != null && msg.getLockedBy().equals("ALL")) || (msg.getRecipient() != null && msg.getRecipient().equals(id) && msg.getInAnswerTo() != null)) { updateProcessedByAndReleaseLock(msg); } else { //Exclusive message - morphium.delete(msg,getCollectionName()); + morphium.delete(msg, getCollectionName()); // msg.addProcessedId(id); // msg.setLockedBy(null); // msg.setLocked(0); @@ -725,11 +783,13 @@ public void terminate() { running = false; if (decouplePool != null) { int sz = decouplePool.shutdownNow().size(); - log.debug("Shutting down with " + sz + " runnables still scheduled"); + if (log.isDebugEnabled()) + log.debug("Shutting down with " + sz + " runnables still scheduled"); } if (threadPool != null) { int sz = threadPool.shutdownNow().size(); - log.debug("Shutting down with " + sz + " runnables still pending in pool"); + if (log.isDebugEnabled()) + log.debug("Shutting down with " + sz + " runnables still pending in pool"); } if (changeStreamMonitor != null) changeStreamMonitor.stop(); sendMessageToSelf(new Msg("info", "going down", "now")); @@ -836,7 +896,7 @@ private void sendMessageToSelf(Msg m, boolean async) { m.setSender("self"); m.setRecipient(id); m.setSenderHost(hostname); - morphium.storeNoCache(m,getCollectionName()); + morphium.storeNoCache(m, getCollectionName()); } public boolean isAutoAnswer() { @@ -880,6 +940,9 @@ public T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs) } Thread.yield(); } + if (log.isDebugEnabled()) { + log.debug("got message after: " + (System.currentTimeMillis() - start) + "ms"); + } waitingForMessages.remove(theMessage.getMsgId()); return (T) waitingForAnswers.remove(theMessage.getMsgId()); } diff --git a/src/de/caluga/morphium/messaging/Msg.java b/src/de/caluga/morphium/messaging/Msg.java index 2178ee348..a6ec8f010 100644 --- a/src/de/caluga/morphium/messaging/Msg.java +++ b/src/de/caluga/morphium/messaging/Msg.java @@ -362,9 +362,11 @@ public Msg createAnswerMsg() { public void sendAnswer(Messaging messaging, Msg m) { m.setInAnswerTo(this.msgId); - m.addRecipient(this.getSender()); + //m.addRecipient(this.getSender()); + m.setRecipient(this.getSender()); m.setDeleteAt(new Date(System.currentTimeMillis() + m.getTtl())); - messaging.queueMessage(m); + m.setMsgId(new MorphiumId()); + messaging.storeMessage(m); } diff --git a/test/de/caluga/test/mongo/suite/MessagingTest.java b/test/de/caluga/test/mongo/suite/MessagingTest.java index ef22e2b56..01d5e6d4c 100644 --- a/test/de/caluga/test/mongo/suite/MessagingTest.java +++ b/test/de/caluga/test/mongo/suite/MessagingTest.java @@ -637,6 +637,90 @@ public void answeringTest() throws Exception { } + + @Test + public void answerExclusiveMessagesTest() throws Exception { + Messaging m1 = new Messaging(morphium, 10, false, true, 10); + m1.setSenderId("m1"); + Messaging m2 = new Messaging(morphium, 10, false, true, 10); + m2.setSenderId("m2"); + Messaging m3 = new Messaging(morphium, 10, false, true, 10); + m3.setSenderId("m3"); + m1.start(); + m2.start(); + m3.start(); + + m3.addListenerForMessageNamed("test", (msg, m) -> { + log.info("INcoming message"); + return m.createAnswerMsg(); + }); + + Msg m = new Msg("test", "important", "value"); + m.setExclusive(true); + Msg answer = m1.sendAndAwaitFirstAnswer(m, 6000); + Thread.sleep(500); + assert (answer != null); + assert (answer.getProcessedBy().size() == 1); + assert (answer.getProcessedBy().contains("m3")); + } + + + @Test + public void ignoringMessagesTest() throws Exception { + Messaging m1 = new Messaging(morphium, 10, false, true, 10); + m1.setSenderId("m1"); + Messaging m2 = new Messaging(morphium, 10, false, true, 10); + m2.setSenderId("m2"); + m1.start(); + m2.start(); + + Msg m = new Msg("test", "ignore me please", "value"); + m1.storeMessage(m); + Thread.sleep(1000); + m = morphium.reread(m); + assert (m.getProcessedBy().size() == 1) : "wrong number of proccessed by entries: " + m.getProcessedBy().size(); + } + + @Test + public void severalMessagingsTest() throws Exception { + Messaging m1 = new Messaging(morphium, 10, false, true, 10); + m1.setSenderId("m1"); + Messaging m2 = new Messaging(morphium, 10, false, true, 10); + m2.setSenderId("m2"); + Messaging m3 = new Messaging(morphium, 10, false, true, 10); + m3.setSenderId("m3"); + m1.start(); + m2.start(); + m3.start(); + + m3.addListenerForMessageNamed("test", (msg, m) -> { + //log.info("Got message: "+m.getName()); + log.info("Sending answer for " + m.getMsgId()); + return new Msg("test", "answer", "value", 600000); + }); + + procCounter.set(0); + for (int i = 0; i < 180; i++) { + new Thread() { + public void run() { + Msg m = new Msg("test", "nothing", "value"); + m.setTtl(60000000); + Msg a = m1.sendAndAwaitFirstAnswer(m, 6000); + assert (a != null); + procCounter.incrementAndGet(); + } + }.start(); + + } + while (procCounter.get() < 150) { + Thread.yield(); + } + + } + + + + @Test public void answers3NodesTest() throws Exception { Messaging m1 = new Messaging(morphium, 10, false, true, 10);