diff --git a/src/org/zoodb/internal/query/ParameterDeclaration.java b/src/org/zoodb/internal/query/ParameterDeclaration.java index 97e9c28c..5e05ab7e 100644 --- a/src/org/zoodb/internal/query/ParameterDeclaration.java +++ b/src/org/zoodb/internal/query/ParameterDeclaration.java @@ -28,10 +28,6 @@ */ public final class ParameterDeclaration { - public interface Consumer { - void setValue(ParameterDeclaration param, Object value); - } - public enum DECLARATION { /** implicit with : */ IMPLICIT, @@ -89,8 +85,8 @@ public Object getValue(Object[] params) { return params[pos]; } - public void setValue(Object[] params, Object object) { - params[pos] = object; + public int getPosition() { + return pos; } public static void adjustValues(List decls, Object[] params) { @@ -102,14 +98,6 @@ public static void adjustValues(List decls, Object[] param if (type != null) { TypeConverterTools.checkAssignability(p1, type); } - params[i] = p1; - //TODO?? - //TODO?? - //TODO?? - //TODO?? -// if (p1 instanceof ZooPC) { -// oid = ((ZooPC)p1).jdoZooGetOid(); -// } } else { params[i] = QueryTerm.NULL; } diff --git a/src/org/zoodb/jdo/impl/QueryImpl.java b/src/org/zoodb/jdo/impl/QueryImpl.java index c6de5d6e..a05706fd 100644 --- a/src/org/zoodb/jdo/impl/QueryImpl.java +++ b/src/org/zoodb/jdo/impl/QueryImpl.java @@ -86,7 +86,7 @@ private enum EXECUTION_TYPE { private final transient PersistenceManagerImpl pm; private transient Extent ext; private boolean isUnmodifiable = false; - private Class candCls = ZooPC.class; //TODO good default? + private Class candCls = ZooPC.class; private transient ZooClassDef candClsDef = null; private String filter = ""; @@ -105,7 +105,7 @@ private enum EXECUTION_TYPE { private final ArrayList variables = new ArrayList<>(); private QueryTree queryTree; - private QueryExecutor queryExecutor; + private final ThreadLocal queryExecutor = new ThreadLocal<>(); //This is used in schema auto-create mode when the persistent class has no schema defined private boolean isDummyQuery = false; @@ -303,68 +303,87 @@ public void compile() { } private void compileQuery() { - //compile only if it was not already compiled, unless the filter changed... - if (queryTree != null) { - return; - } - - //TODO do we really need this? - String fStr = filter; - if (rangeStr != null) { - fStr = (fStr == null) ? "" : fStr; - fStr += " range " + rangeStr; - } - if (orderingStr != null) { - fStr = (fStr == null) ? "" : fStr; - fStr += " order by " + orderingStr; - } - - if (fStr == null || fStr.trim().length() == 0 || isDummyQuery) { - return; - } - - if (DBStatistics.isEnabled()) { - pm.getSession().statsInc(DBStatistics.STATS.QU_COMPILED); - } - - executionType = determineExecutionType(); - - QueryParserAPI qp; - //We do this on the query before assigning values to parameter. - //Would it make sense to assign the values first and then properly parse the query??? - //Probably not: - //- every parameter change would require rebuilding the tree - //- we would require an additional parser to assign the parameters - //QueryParser qp = new QueryParser(filter, candClsDef, parameters, ordering); - //QueryParserV2 qp = new QueryParserV2(filter, candClsDef, parameters, ordering); - //QueryParserV3 qp = - // new QueryParserV3(fStr, candClsDef, parameters, variables, ordering, rangeMin, rangeMax); - if (executionType == EXECUTION_TYPE.V4 || executionType == EXECUTION_TYPE.FORCED_V4 ) { - qp = new QueryParserV4(fStr, candClsDef, parameters, variables, - ordering, rangeMin, rangeMax, pm.getSession()); - } else { - qp = new QueryParserV3(fStr, candClsDef, parameters, ordering, rangeMin, rangeMax); - } - queryTree = qp.parseQuery(); - rangeMin = qp.getRangeMin(); - rangeMax = qp.getRangeMax(); + // Protect access to queryTree instance/reference + // TODO We should improve thread safety here: + // 1) What do we want? Do we need query re-compilation at all? + // - Re-setting essential parameters (filter/ordering/range/declared params/projection) + // is nice but not really useful, and allowing recompilation makes + // concurrency hard. + // Ideal: most parameters should be immutable once the query is compiled. + // - What is the point of setUnmodifiable(), spec + // 2) Create config object with ThreadLocal + // Take care that it is properly initialised. + // Create a ThreadLocal config instance, reference as primary instance. + // Initialize other instances with this reference instance. + // - BLOCK ALL OTHER CHANGES once query is compiled. + // - Parse RANGE (ORDERING?) separately from main query -> avoid reset() + synchronized (this) { + //compile only if it was not already compiled, unless the filter changed... + if (queryTree != null) { + return; + } + + //TODO do we really need this? + String fStr = filter; + if (rangeStr != null) { + fStr = (fStr == null) ? "" : fStr; + fStr += " range " + rangeStr; + } + if (orderingStr != null) { + fStr = (fStr == null) ? "" : fStr; + fStr += " order by " + orderingStr; + } + + if (fStr == null || fStr.trim().length() == 0 || isDummyQuery) { + return; + } + + if (DBStatistics.isEnabled()) { + pm.getSession().statsInc(DBStatistics.STATS.QU_COMPILED); + } + + executionType = determineExecutionType(); + + QueryParserAPI qp; + //We do this on the query before assigning values to parameter. + //Would it make sense to assign the values first and then properly parse the query??? + //Probably not: + //- every parameter change would require rebuilding the tree + //- we would require an additional parser to assign the parameters + //QueryParser qp = new QueryParser(filter, candClsDef, parameters, ordering); + //QueryParserV2 qp = new QueryParserV2(filter, candClsDef, parameters, ordering); + //QueryParserV3 qp = + // new QueryParserV3(fStr, candClsDef, parameters, variables, ordering, rangeMin, rangeMax); + if (executionType == EXECUTION_TYPE.V4 || executionType == EXECUTION_TYPE.FORCED_V4 ) { + qp = new QueryParserV4(fStr, candClsDef, parameters, variables, + ordering, rangeMin, rangeMax, pm.getSession()); + } else { + qp = new QueryParserV3(fStr, candClsDef, parameters, ordering, rangeMin, rangeMax); + } + queryTree = qp.parseQuery(); + rangeMin = qp.getRangeMin(); + rangeMax = qp.getRangeMax(); + } } private void resetQuery() { - //See Test_122: We need to clear this for setFilter() calls - for (int i = 0; i < parameters.size(); i++) { - ParameterDeclaration p = parameters.get(i); - if (p.getDeclaration() != DECLARATION.API) { - parameters.remove(i); - i--; - } - } - queryTree = null; - queryExecutor = null; - ordering.clear(); - if (executionType == EXECUTION_TYPE.V3 || executionType == EXECUTION_TYPE.V4) { - executionType = EXECUTION_TYPE.UNDEFINED; - } + // Protect access to queryTree instance/reference + synchronized (this) { + //See Test_122: We need to clear this for setFilter() calls + for (int i = 0; i < parameters.size(); i++) { + ParameterDeclaration p = parameters.get(i); + if (p.getDeclaration() != DECLARATION.API) { + parameters.remove(i); + i--; + } + } + queryTree = null; + queryExecutor.remove(); + ordering.clear(); + if (executionType == EXECUTION_TYPE.V3 || executionType == EXECUTION_TYPE.V4) { + executionType = EXECUTION_TYPE.UNDEFINED; + } + } } @Override @@ -522,8 +541,7 @@ public Object execute() { pm.getSession().statsInc(STATS.QU_EXECUTED_TOTAL); pm.getSession().statsInc(STATS.QU_EXECUTED_WITHOUT_INDEX); } - createExecutor(); - return queryExecutor.runWithExtent(new ExtentAdaptor(extent), + return getOrCreateExecutor().runWithExtent(new ExtentAdaptor(extent), rangeMin, rangeMax, resultSettings, resultClass); } finally { pm.getSession().unlock(); @@ -535,18 +553,22 @@ public Object execute() { return runQuery(NO_PARAMS); } - - private void createExecutor() { - if (queryExecutor == null) { - queryExecutor = new QueryExecutor(pm.getSession(), filter, candCls, candClsDef, - unique, subClasses, isDummyQuery, ordering, queryTree, parameters, variables); + private QueryExecutor getOrCreateExecutor() { + if (queryExecutor.get() == null) { + // Protect access to queryTree instance/reference + synchronized (this) { + queryExecutor.set(new QueryExecutor(pm.getSession(), filter, candCls, candClsDef, + unique, subClasses, isDummyQuery, ordering, queryTree, parameters, + variables)); + } } + return queryExecutor.get(); } private Object runQuery(Object[] params) { - createExecutor(); + QueryExecutor executor = getOrCreateExecutor(); ParameterDeclaration.adjustValues(parameters, params); - return queryExecutor.runQuery(ext, rangeMin, rangeMax, resultSettings, resultClass, + return executor.runQuery(ext, rangeMin, rangeMax, resultSettings, resultClass, ignoreCache, params); } @@ -593,7 +615,7 @@ public Object executeWithMap(Map parameters) { compileQuery(); Object[] params = new Object[parameters.size()]; for (ParameterDeclaration p: this.parameters) { - p.setValue(params, parameters.get(p.getName())); + params[p.getPosition()] = parameters.get(p.getName()); } checkParamCount(parameters.size()); return runQuery(params); diff --git a/tst/org/zoodb/test/jdo/Test_025_SingleSessionConcurrency.java b/tst/org/zoodb/test/jdo/Test_025_SingleSessionConcurrency.java index 02e3de9a..c529208b 100644 --- a/tst/org/zoodb/test/jdo/Test_025_SingleSessionConcurrency.java +++ b/tst/org/zoodb/test/jdo/Test_025_SingleSessionConcurrency.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.jdo.Constants; import javax.jdo.Extent; @@ -53,7 +54,7 @@ public class Test_025_SingleSessionConcurrency { private final int COMMIT_INTERVAL = 250; private final int T = 8; - private final static ArrayList errors = new ArrayList<>(); + private final static ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>(); @BeforeClass public static void beforeClass() { @@ -80,31 +81,23 @@ public void tearDown() { try { TestTools.removeDb(); } catch (IllegalStateException e) { - errors.add(e); - } - if (!errors.isEmpty()) { - RuntimeException e = new RuntimeException("errors: " + errors.size(), errors.get(0)); - for (Throwable t: errors) { - e.addSuppressed(t); - } - errors.clear(); - throw e; + errors.add(e); } + checkErrors(); } private void checkErrors() { - if (!errors.isEmpty()) { - RuntimeException e = new RuntimeException("errors: " + errors.size(), errors.get(0)); - for (Throwable t: errors) { - e.addSuppressed(t); - } - errors.clear(); + Throwable t; + if (!errors.isEmpty()) { + RuntimeException e = new RuntimeException("errors: " + errors.size(), errors.peek()); + while ((t = errors.poll()) != null) { + e.addSuppressed(t); + } throw e; } } private abstract static class Worker extends Thread { - final PersistenceManager pm; final int N; final int COMMIT_INTERVAL; @@ -133,36 +126,72 @@ public void run() { } - private static class Reader extends Worker { - - private Reader(int id, int n, PersistenceManager pm) { - super(id, n, -1, pm); - } - - @SuppressWarnings("unchecked") - @Override - public void runWorker() { - Extent ext = pm.getExtent(TestSuper.class); - for (TestSuper t: ext) { - assertTrue(t.getData()[0] >= 0 && t.getData()[0] < N); - TestSuper t2 = (TestSuper) pm.getObjectById( JDOHelper.getObjectId(t) ); - assertEquals(t.getId(), t2.getId()); - if (t.getId() == ID && t.getTime() < N/2) { - n++; - } - } - Collection col = - (Collection) pm.newQuery( - TestSuper.class, "_id == " + ID + " && _time >= " + (N/2)).execute(); - for (TestSuper t: col) { - assertEquals(t.getId(), ID); - assertTrue(t.getData()[0] >= 0 && t.getData()[0] < N); - TestSuper t2 = (TestSuper) pm.getObjectById( JDOHelper.getObjectId(t) ); - assertEquals(t.getId(), t2.getId()); - n++; - } - } - } + private static class Reader extends Worker { + + private Reader(int id, int n, PersistenceManager pm) { + super(id, n, -1, pm); + } + + @SuppressWarnings("unchecked") + @Override + public void runWorker() { + Extent ext = pm.getExtent(TestSuper.class); + for (TestSuper t: ext) { + assertTrue(t.getData()[0] >= 0 && t.getData()[0] < N); + TestSuper t2 = (TestSuper) pm.getObjectById( JDOHelper.getObjectId(t) ); + assertEquals(t.getId(), t2.getId()); + if (t.getId() == ID && t.getTime() < N/2) { + n++; + } + } + Collection col = + (Collection) pm.newQuery( + TestSuper.class, "_id == " + ID + " && _time >= " + (N/2)).execute(); + for (TestSuper t: col) { + assertEquals(t.getId(), ID); + assertTrue(t.getData()[0] >= 0 && t.getData()[0] < N); + TestSuper t2 = (TestSuper) pm.getObjectById( JDOHelper.getObjectId(t) ); + assertEquals(t.getId(), t2.getId()); + n++; + } + } + } + + + private static class QueryWorker extends Worker { + private final Query query; + + private QueryWorker(int id, int n, PersistenceManager pm, Query query) { + super(id, n, -1, pm); + this.query = query; + } + + @SuppressWarnings("unchecked") + @Override + public void runWorker() { + //query.setRange(0, N * 1231); + Collection col = (Collection) query.execute(ID, 0, N/2); + checkQuery(col); + + //query.setRange(0, N/4); + col = (Collection) query.execute(ID, N/2, N*3/4);//, N); + checkQuery(col); + + //query.setRange(N/4, N/2); + col = (Collection) query.execute(ID, N*3/4, N); + checkQuery(col); + } + + private void checkQuery(Collection col) { + for (TestSuper t: col) { + assertEquals(t.getId(), ID); + assertTrue(t.getData()[0] >= 0 && t.getData()[0] < N); + TestSuper t2 = (TestSuper) pm.getObjectById( JDOHelper.getObjectId(t) ); + assertEquals(t.getId(), t2.getId()); + n++; + } + } + } private static class Writer extends Worker { @@ -229,45 +258,86 @@ public void runWorker() { } } - /** - * Test concurrent read. + /** + * Test concurrent read. * @throws InterruptedException when interrupted. - */ - @Test - public void testParallelRead() throws InterruptedException { - PersistenceManager pm = TestTools.openPM(); - pm.setMultithreaded(true); - pm.currentTransaction().begin(); - - //write - Writer w = new Writer(0, N, COMMIT_INTERVAL, pm); - w.start(); - w.join(); - - pm.currentTransaction().commit(); - pm.currentTransaction().begin(); - - //read - ArrayList readers = new ArrayList<>(); - for (int i = 0; i < T; i++) { - readers.add(new Reader(0, N, pm)); - } - - for (Reader reader: readers) { - reader.start(); - } - - for (Reader reader: readers) { - reader.join(); - } - checkErrors(); - for (Reader reader: readers) { - assertEquals("id=" + reader.ID, N, reader.n); - } - - pm.currentTransaction().commit(); - TestTools.closePM(); - } + */ + @Test + public void testParallelRead() throws InterruptedException { + PersistenceManager pm = TestTools.openPM(); + pm.setMultithreaded(true); + pm.currentTransaction().begin(); + + //write + Writer w = new Writer(0, N, COMMIT_INTERVAL, pm); + w.start(); + w.join(); + + pm.currentTransaction().commit(); + pm.currentTransaction().begin(); + + //read + ArrayList readers = new ArrayList<>(); + for (int i = 0; i < T; i++) { + readers.add(new Reader(0, N, pm)); + } + + for (Reader reader: readers) { + reader.start(); + } + + for (Reader reader: readers) { + reader.join(); + } + checkErrors(); + for (Reader reader: readers) { + assertEquals("id=" + reader.ID, N, reader.n); + } + + pm.currentTransaction().commit(); + TestTools.closePM(); + } + + /** + * Test concurrent read. + * @throws InterruptedException when interrupted. + */ + @Test + public void testParallelQuery() throws InterruptedException { + PersistenceManager pm = TestTools.openPM(); + pm.setMultithreaded(true); + pm.currentTransaction().begin(); + + //write + Writer w = new Writer(0, N, COMMIT_INTERVAL, pm); + w.start(); + w.join(); + + pm.currentTransaction().commit(); + pm.currentTransaction().begin(); + Query query = pm.newQuery(TestSuper.class, "_id == :id && _time >= :min && _time < :max"); + + //read + ArrayList readers = new ArrayList<>(); + for (int i = 0; i < T; i++) { + readers.add(new QueryWorker(0, N, pm, query)); + } + + for (QueryWorker reader: readers) { + reader.start(); + } + + for (QueryWorker reader: readers) { + reader.join(); + } + checkErrors(); + for (QueryWorker reader: readers) { + assertEquals("id=" + reader.ID, N, reader.n); + } + + pm.currentTransaction().commit(); + TestTools.closePM(); + } /** * Test concurrent write.