Skip to content

Commit

Permalink
Merge pull request #15 from sboesebeck/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
sboesebeck committed Jan 12, 2016
2 parents b3d3a4b + 0c5543a commit 26b23da
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 104 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ target/
*.iml
dist/
*.orig
MorphiumDoku.html
129 changes: 75 additions & 54 deletions MorphiumDoku.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions create_html_doc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

multimarkdown -t html MorphiumDoku.md -o MorphiumDoku.html
8 changes: 6 additions & 2 deletions src/de/caluga/morphium/Morphium.java
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public void run() {

try {
if (morphiumDriver.isCapped(config.getDatabase(), coll)) return;
if (!morphiumDriver.exists(config.getDatabase(), coll)) {
if (config.isAutoIndexAndCappedCreationOnWrite() && !morphiumDriver.exists(config.getDatabase(), coll)) {
if (logger.isDebugEnabled())
logger.debug("Collection does not exist - ensuring indices / capped status");
Map<String, Object> cmd = new HashMap<>();
Expand Down Expand Up @@ -2090,12 +2090,16 @@ public void close() {
if (cacheHousekeeper.isAlive()) {
cacheHousekeeper.interrupt();
}
rsMonitor.terminate();

config.getAsyncWriter().close();
config.getBufferedWriter().close();
config.getWriter().close();
try {
getDriver().close();
} catch (MorphiumDriverException e) {
e.printStackTrace();
}
// config.getDb().getMongo().close();
config = null;
// MorphiumSingleton.reset();
}
Expand Down
10 changes: 10 additions & 0 deletions src/de/caluga/morphium/MorphiumConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class MorphiumConfig {
private int maxAutoReconnectTime = 0;
private int blockingThreadsMultiplier = 5;

private boolean autoIndexAndCappedCreationOnWrite = true;

@Transient
private Class<? extends Query> queryClass;
@Transient
Expand Down Expand Up @@ -205,6 +207,14 @@ public MorphiumConfig(String db, int maxConnections, int globalCacheValidTime, i

}

public boolean isAutoIndexAndCappedCreationOnWrite() {
return autoIndexAndCappedCreationOnWrite;
}

public void setAutoIndexAndCappedCreationOnWrite(boolean autoIndexAndCappedCreationOnWrite) {
this.autoIndexAndCappedCreationOnWrite = autoIndexAndCappedCreationOnWrite;
}

public boolean isCheckForNew() {
return checkForNew;
}
Expand Down
20 changes: 1 addition & 19 deletions src/de/caluga/morphium/query/QueryFactoryImpl.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package de.caluga.morphium.query;

import de.caluga.morphium.Morphium;
import de.caluga.morphium.ShutdownListener;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* User: Stephan Bösebeck
Expand All @@ -32,21 +29,6 @@ public QueryFactoryImpl(Class<? extends Query> qi) {

@Override
public ThreadPoolExecutor getExecutor(Morphium m) {
if (executor == null) {
executor = new ThreadPoolExecutor(m.getConfig().getMaxConnections() / 2, (int) (m.getConfig().getMaxConnections() * m.getConfig().getBlockingThreadsMultiplier() * 0.9),
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
m.addShutdownListener(new ShutdownListener() {
@Override
public void onShutdown(Morphium m) {
try {
executor.shutdownNow();
} catch (Exception e) {
//swallow
}
}
});
}
return executor;
}

Expand All @@ -66,7 +48,7 @@ public <T> Query<T> createQuery(Morphium m, Class<? extends T> type) {
Query<T> q = queryImpl.newInstance();
q.setMorphium(m);
q.setType(type);
q.setExecutor(getExecutor(m));
q.setExecutor(m.getAsyncOperationsThreadPool());
return q;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
Expand Down
31 changes: 18 additions & 13 deletions src/de/caluga/morphium/writer/BufferedMorphiumWriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ public BufferedMorphiumWriterImpl() {

}

public void close() {
running = false;
try {
long start = System.currentTimeMillis();
while (housekeeping.isAlive()) {
if (System.currentTimeMillis() - start > 1000) {
housekeeping.stop();
break;
}
Thread.sleep(50);
}
} catch (Exception e) {
//swallow on shutdown
}
}

public boolean isOrderedExecution() {
return orderedExecution;
}
Expand Down Expand Up @@ -570,24 +586,13 @@ private void runIt(Class<?> clz) {
m.addShutdownListener(new ShutdownListener() {
@Override
public void onShutdown(Morphium m) {
running = false;
try {
long start = System.currentTimeMillis();
while (housekeeping.isAlive()) {
if (System.currentTimeMillis() - start > 1000) {
housekeeping.stop();
break;
}
Thread.sleep(50);
}
} catch (Exception e) {
//swallow on shutdown
}
close();
}
});

}


@Override
public <T> void remove(final List<T> lst, AsyncOperationCallback<T> c) {
if (c == null) {
Expand Down
5 changes: 5 additions & 0 deletions src/de/caluga/morphium/writer/MorphiumWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,10 @@ public interface MorphiumWriter {

<T> void inc(Query<T> query, Map<String, Number> fieldsToInc, boolean insertIfNotExist, boolean multiple, AsyncOperationCallback<T> callback);

/**
* information about closing of morphium and all connections
*/
void close();


}
51 changes: 35 additions & 16 deletions src/de/caluga/morphium/writer/MorphiumWriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public void setMorphium(Morphium m) {
}
}

public void close() {
executor.shutdownNow();
}

/**
* @param obj - object to store
*/
Expand Down Expand Up @@ -126,7 +130,7 @@ public void run() {
if (coll == null) {
coll = morphium.getMapper().getCollectionName(type);
}
if (!morphium.getDriver().exists(getDbName(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
if (logger.isDebugEnabled())
logger.debug("Collection " + coll + " does not exist - ensuring indices");
createCappedColl(o.getClass());
Expand Down Expand Up @@ -346,8 +350,9 @@ public void run() {
// BulkWriteOperation bulkWriteOperation = collection.initializeUnorderedBulkOperation();
// BulkRequestContext bulk = morphium.getDriver().createBulkContext(morphium, morphium.getConfig().getDatabase(), collectionName, false, wc);
HashMap<Object, Boolean> isNew = new HashMap<>();
if (!morphium.getDriver().exists(morphium.getConfig().getDatabase(), collectionName)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(morphium.getConfig().getDatabase(), collectionName)) {
logger.warn("collection does not exist while storing list - taking first element of list to ensure indices");
createCappedColl(lst.get(0).getClass());
morphium.ensureIndicesFor((Class<T>) lst.get(0).getClass(), collectionName, callback);
}
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -480,7 +485,7 @@ public void run() {
WriteConcern wc = morphium.getWriteConcernForClass(c);
String coll = morphium.getMapper().getCollectionName(c);

if (!morphium.getDriver().exists(morphium.getConfig().getDatabase(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(morphium.getConfig().getDatabase(), coll)) {
createCappedColl(c);
morphium.ensureIndicesFor(c, coll, callback);
}
Expand Down Expand Up @@ -546,12 +551,15 @@ public void run() {

}


private void createCappedColl(Class c) {
createCappedColl(c, null);
}

private void createCappedColl(Class c, String n) {
if (logger.isDebugEnabled())
logger.debug("Collection does not exist - ensuring indices / capped status");
Map<String, Object> cmd = new LinkedHashMap<>();
cmd.put("create", morphium.getMapper().getCollectionName(c));
cmd.put("create", n != null ? n : morphium.getMapper().getCollectionName(c));
Capped capped = morphium.getARHelper().getAnnotationFromHierarchy(c, Capped.class);
if (capped != null) {
cmd.put("capped", true);
Expand Down Expand Up @@ -588,7 +596,7 @@ public void run() {
WriteConcern wc = morphium.getWriteConcernForClass(c);
String coll = morphium.getMapper().getCollectionName(c);
try {
if (!morphium.getDriver().exists(morphium.getConfig().getDatabase(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(morphium.getConfig().getDatabase(), coll)) {
if (logger.isDebugEnabled())
logger.debug("Collection does not exist - ensuring indices / capped status");
Map<String, Object> cmd = new LinkedHashMap<>();
Expand Down Expand Up @@ -713,7 +721,8 @@ public void run() {
long start = System.currentTimeMillis();

try {
if (upsert && !morphium.getDriver().exists(getDbName(), collection)) {
if (upsert && morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), collection)) {
createCappedColl(cls, collection);
morphium.ensureIndicesFor(cls, collection, callback);
}

Expand Down Expand Up @@ -846,7 +855,8 @@ public void run() {
collectionName = collection;
}

if (!morphium.getDriver().exists(getDbName(), collectionName)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), collectionName)) {
createCappedColl(ent.getClass(), collectionName);
morphium.ensureIndicesFor((Class<T>) ent.getClass(), collectionName, callback);
}
morphium.getDriver().update(getDbName(), collectionName, find, update, false, false, wc);
Expand Down Expand Up @@ -1037,7 +1047,8 @@ public void run() {

long start = System.currentTimeMillis();
try {
if (!morphium.getDriver().exists(getDbName(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor(cls, coll, callback);
}
morphium.getDriver().update(getDbName(), coll, query, update, false, false, wc);
Expand Down Expand Up @@ -1109,7 +1120,8 @@ public void run() {

long start = System.currentTimeMillis();
try {
if (upsert && !morphium.getDriver().exists(getDbName(), coll)) {
if (upsert && morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor((Class<T>) cls, coll, callback);
}
WriteConcern wc = morphium.getWriteConcernForClass(cls);
Expand Down Expand Up @@ -1155,8 +1167,10 @@ public void run() {

long start = System.currentTimeMillis();
try {
if (upsert && !morphium.getDriver().exists(getDbName(), coll)) {
if (upsert && morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor(cls, coll, callback);

}
WriteConcern wc = morphium.getWriteConcernForClass(cls);
morphium.getDriver().update(getDbName(), coll, qobj, update, multiple, upsert, wc);
Expand Down Expand Up @@ -1248,7 +1262,8 @@ public void run() {
WriteConcern wc = morphium.getWriteConcernForClass(cls);
long start = System.currentTimeMillis();
try {
if (upsert && !morphium.getDriver().exists(getDbName(), coll)) {
if (upsert && morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor((Class<T>) cls, coll, callback);
}
morphium.getDriver().update(getDbName(), coll, qobj, update, multiple, upsert, wc);
Expand Down Expand Up @@ -1361,7 +1376,8 @@ public void run() {
long start = System.currentTimeMillis();

try {
if (!morphium.getDriver().exists(getDbName(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor(cls, coll, callback);
}
morphium.getDriver().update(getDbName(), coll, query, update, false, false, wc);
Expand Down Expand Up @@ -1440,7 +1456,8 @@ public void run() {
long start = System.currentTimeMillis();

try {
if (!morphium.getDriver().exists(getDbName(), coll)) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor(cls, coll, callback);
}
morphium.getDriver().update(getDbName(), coll, query, update, false, false, wc);
Expand Down Expand Up @@ -1623,7 +1640,8 @@ private void pushIt(boolean push, boolean upsert, boolean multiple, Class<?> cls
WriteConcern wc = morphium.getWriteConcernForClass(cls);
long start = System.currentTimeMillis();
try {
if (!morphium.getDriver().exists(getDbName(), coll) && upsert) {
if (morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll) && upsert) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor(cls, coll);
}
morphium.getDriver().update(getDbName(), coll, qobj, update, multiple, upsert, wc);
Expand Down Expand Up @@ -1715,7 +1733,8 @@ public void run() {

WriteConcern wc = morphium.getWriteConcernForClass(cls);
try {
if (upsert && !morphium.getDriver().exists(getDbName(), coll)) {
if (upsert && morphium.getConfig().isAutoIndexAndCappedCreationOnWrite() && !morphium.getDriver().exists(getDbName(), coll)) {
createCappedColl(cls, coll);
morphium.ensureIndicesFor((Class<T>) cls, coll, callback);
}
morphium.getDriver().update(getDbName(), coll, qobj, update, multiple, upsert, wc);
Expand Down
27 changes: 27 additions & 0 deletions test/de/caluga/test/mongo/suite/ExitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.caluga.test.mongo.suite;/**
* Created by stephan on 12.01.16.
*/

import de.caluga.morphium.Morphium;
import de.caluga.morphium.MorphiumConfig;
import de.caluga.test.mongo.suite.data.CachedObject;

/**
* TODO: Add Documentation here
**/
public class ExitTest {

public static void main(String args[]) throws Exception {
// Morphium m=new Morphium("localhost:27017","morphium-test");

MorphiumConfig cfg = new MorphiumConfig();
cfg.setDatabase("morphium-test");
cfg.setHostSeed("localhost:27017,localhost:27018,localhost:27019");
Morphium m = new Morphium(cfg);
System.out.println("Connection opened...");

m.createQueryFor(CachedObject.class).countAll();
m.close();
System.out.println("All closed");
}
}

0 comments on commit 26b23da

Please sign in to comment.