Skip to content

Commit

Permalink
Fix schema generation with quads
Browse files Browse the repository at this point in the history
  • Loading branch information
kerim1 committed Aug 10, 2022
1 parent 74c6271 commit 4f0abdd
Show file tree
Hide file tree
Showing 22 changed files with 505 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import nl.knaw.huygens.timbuctoo.server.tasks.DbLogCreatorTask;
import nl.knaw.huygens.timbuctoo.server.tasks.MoveDefaultGraphsTask;
import nl.knaw.huygens.timbuctoo.server.tasks.MoveEdgesTask;
import nl.knaw.huygens.timbuctoo.server.tasks.RebuildSchemaTask;
import nl.knaw.huygens.timbuctoo.server.tasks.ReimportDatasetsTask;
import nl.knaw.huygens.timbuctoo.server.tasks.UserCreationTask;
import nl.knaw.huygens.timbuctoo.solr.Webhooks;
Expand Down Expand Up @@ -425,6 +426,7 @@ public void run(TimbuctooConfiguration configuration, Environment environment) t
);
environment.admin().addTask(new ReimportDatasetsTask(dataSetRepository));
environment.admin().addTask(new CompressFilesTask(dataSetRepository));
environment.admin().addTask(new RebuildSchemaTask(dataSetRepository));

// register health checks
// Dropwizard Health checks are used to check whether requests should be routed to this instance
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nl.knaw.huygens.timbuctoo.server.tasks;

import io.dropwizard.servlets.tasks.Task;
import nl.knaw.huygens.timbuctoo.v5.dataset.DataSetRepository;
import nl.knaw.huygens.timbuctoo.v5.dataset.dto.DataSet;
import nl.knaw.huygens.timbuctoo.v5.datastores.implementations.bdb.BdbQuadStore;
import nl.knaw.huygens.timbuctoo.v5.datastores.implementations.bdb.BdbSchemaStore;

import java.io.PrintWriter;
import java.util.List;
import java.util.Map;

public class RebuildSchemaTask extends Task {
private final DataSetRepository dataSetRepository;

public RebuildSchemaTask(DataSetRepository dataSetRepository) {
super("rebuildSchema");
this.dataSetRepository = dataSetRepository;
}

@Override
public void execute(Map<String, List<String>> parameters, PrintWriter output) throws Exception {
for (DataSet dataSet : dataSetRepository.getDataSets()) {
rebuildSchemaFor(dataSet, output);
}
}

private void rebuildSchemaFor(DataSet dataSet, PrintWriter output) {
output.println("Rebuilding schema for dataset: " + dataSet.getMetadata().getCombinedId());
output.flush();

BdbSchemaStore schemaStore = (BdbSchemaStore) dataSet.getSchemaStore();
schemaStore.rebuildSchema((BdbQuadStore) dataSet.getQuadStore());

output.println("Finished rebuilding schema of dataset: " + dataSet.getMetadata().getCombinedId());
output.flush();
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
package nl.knaw.huygens.timbuctoo.util;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Streams {
public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> seen.add(keyExtractor.apply(t));
}

public static <R> Stream<Set<R>> combine(Stream<R> stream, BiPredicate<R, R> shouldCombinePred) {
return combine(stream, shouldCombinePred, HashSet::new);
}

public static <R, C extends Collection<R>> Stream<C> combine(
Stream<R> stream, BiPredicate<R, R> shouldCombinePred, Supplier<C> init) {
final Spliterator<R> sp = stream.spliterator();
final Spliterator<Set<R>> combineSp = new AbstractSpliterator<>(sp.estimateSize(), sp.characteristics()) {
final Spliterator<C> combineSp = new AbstractSpliterator<>(sp.estimateSize(), sp.characteristics()) {
private R prev = null;
private R cur = null;

@Override
public boolean tryAdvance(Consumer<? super Set<R>> action) {
public boolean tryAdvance(Consumer<? super C> action) {
if (prev == null && !sp.tryAdvance(el -> prev = el)) {
return false;
}

Set<R> combined = new HashSet<>();
C combined = init.get();
combined.add(prev);

boolean canAdvance;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package nl.knaw.huygens.timbuctoo.v5.dataset;

import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.CursorQuad;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.Direction;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.QuadGraphs;

import java.util.stream.Stream;

public interface ChangeFetcher {

Stream<CursorQuad> getPredicates(String subject, boolean getRetracted, boolean getUnchanged, boolean getAsserted);
Stream<QuadGraphs> getPredicates(String subject, boolean getRetracted, boolean getUnchanged, boolean getAsserted);

Stream<CursorQuad> getPredicates(String subject, String predicate, Direction direction, boolean getRetracted,
Stream<QuadGraphs> getPredicates(String subject, String predicate, Direction direction, boolean getRetracted,
boolean getUnchanged, boolean getAsserted);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.QuadStore;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.CursorQuad;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.Direction;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.QuadGraphs;
import nl.knaw.huygens.timbuctoo.v5.util.Graph;
import nl.knaw.huygens.timbuctoo.v5.util.RdfConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,6 +80,42 @@ public Stream<CursorQuad> getQuadsInGraph(String subject, String predicate,
.filter(quad -> quad.inGraph(graph));
}

@Override
public Stream<CursorUri> getSubjectsInCollection(String collectionUri, String cursor) {
return getSubjectsInCollectionInGraph(collectionUri, cursor, Optional.empty());
}

@Override
public Stream<CursorUri> getSubjectsInCollectionInGraph(String collectionUri, String cursor, Optional<Graph> graph) {
DatabaseGetter.Iterate direction = cursor.isEmpty() || cursor.startsWith("A\n") ? FORWARDS : BACKWARDS;

final DatabaseGetter.PrimedBuilder<String, String> getter;
if (cursor.equals("LAST")) {
getter = bdbWrapper.databaseGetter()
.key((formatKey(collectionUri, RdfConstants.RDF_TYPE, Direction.IN)))
.skipToEnd();
} else {
getter = bdbWrapper.databaseGetter()
.key((formatKey(collectionUri, RdfConstants.RDF_TYPE, Direction.IN)))
.dontSkip();
}

Stream<CursorUri> result = getter
.direction(direction)
.getKeysAndValues(bdbWrapper.keyValueConverter(this::formatResult))
.filter(quad -> quad.inGraph(graph))
.map(quad -> CursorUri.create(quad.getObject()))
.distinct();

if (cursor.isEmpty()) {
return result;
}

return result
.dropWhile(cursorUri -> !cursorUri.getUri().equals(cursor.substring(2)))
.skip(1); //we start after the cursor
}

@Override
public Stream<CursorQuad> getAllQuads() {
return getAllQuadsInGraph(Optional.empty());
Expand Down Expand Up @@ -128,22 +166,36 @@ public boolean deleteQuad(String subject, String predicate, Direction direction,
return bdbWrapper.delete(formatKey(subject, predicate, direction), value);
}

public String formatKey(String subject, String predicate, Direction direction) {
public static String formatKey(String subject, String predicate, Direction direction) {
return subject + "\n" + predicate + "\n" + (direction == null ? "" : direction.name());
}

public String formatValue(String object, String dataType, String language, String graph) {
public static String formatValue(String object, String dataType, String language, String graph) {
return (dataType == null ? "" : dataType) + "\n" + (language == null ? "" : language) + "\n" +
(graph == null ? "" : graph) + "\n" + object;
}

public int compare(CursorQuad leftQ, CursorQuad rightQ) {
final String leftStr = formatKey(leftQ.getSubject(), leftQ.getPredicate(), leftQ.getDirection()) + "\n" +
formatValue(leftQ.getObject(), leftQ.getValuetype().orElse(null),
leftQ.getLanguage().orElse(null), leftQ.getGraph().orElse(null));
final String rightStr = formatKey(rightQ.getSubject(), rightQ.getPredicate(), rightQ.getDirection()) + "\n" +
formatValue(rightQ.getObject(), rightQ.getValuetype().orElse(null),
rightQ.getLanguage().orElse(null), rightQ.getGraph().orElse(null));
public static String format(CursorQuad quad) {
return formatKey(quad.getSubject(), quad.getPredicate(), quad.getDirection()) + "\n" +
formatValue(quad.getObject(), quad.getValuetype().orElse(null),
quad.getLanguage().orElse(null), quad.getGraph().orElse(null));
}

public static String format(QuadGraphs quad) {
return formatKey(quad.getSubject(), quad.getPredicate(), quad.getDirection()) + "\n" +
formatValue(quad.getObject(), quad.getValuetype().orElse(null),
quad.getLanguage().orElse(null), null);
}

public static int compare(CursorQuad leftQ, CursorQuad rightQ) {
final String leftStr = format(leftQ);
final String rightStr = format(rightQ);
return leftStr.compareTo(rightStr);
}

public static int compare(QuadGraphs leftQ, QuadGraphs rightQ) {
final String leftStr = format(leftQ);
final String rightStr = format(rightQ);
return leftStr.compareTo(rightStr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import nl.knaw.huygens.timbuctoo.v5.berkeleydb.exceptions.DatabaseWriteException;
import nl.knaw.huygens.timbuctoo.v5.dataset.ChangeFetcher;
import nl.knaw.huygens.timbuctoo.v5.dataset.ImportStatus;
import nl.knaw.huygens.timbuctoo.v5.dataset.exceptions.DataStoreCreationException;
import nl.knaw.huygens.timbuctoo.v5.dataset.exceptions.RdfProcessingFailedException;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.ChangeType;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.CursorQuad;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.Direction;
import nl.knaw.huygens.timbuctoo.v5.datastores.quadstore.dto.QuadGraphs;
import nl.knaw.huygens.timbuctoo.v5.datastores.rmldatasource.RmlDataSourceStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,9 +77,9 @@ public void onChangedSubject(String subject, ChangeFetcher changeFetcher) throws
}
);
if (!wasCollection[0]) {
try (Stream<CursorQuad> quads =
try (Stream<QuadGraphs> quads =
changeFetcher.getPredicates(subject, TIM_HAS_ROW, Direction.IN, true, true, true)) {
Optional<CursorQuad> isRawRow = quads.findFirst();
Optional<QuadGraphs> isRawRow = quads.findFirst();
if (isRawRow.isPresent()) {
final String collectionUri = isRawRow.get().getObject();
Map<String, Property> predicatesToStore = collectionProperties.computeIfAbsent(
Expand Down
Loading

0 comments on commit 4f0abdd

Please sign in to comment.