Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix updates for objects with housenumbers #773

Merged
merged 4 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/de/komoot/photon/Importer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface Importer {
*
* @param doc
*/
public void add(PhotonDoc doc);
public void add(PhotonDoc doc, int object_id);

/**
* import is finished
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/de/komoot/photon/JsonDumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public JsonDumper(String filename, String[] languages, String[] extraTags) throw
}

@Override
public void add(PhotonDoc doc) {
public void add(PhotonDoc doc, int object_id) {
try {
writer.println("{\"index\": {}}");
writer.println(Utils.convert(doc, languages, extraTags).string());
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/de/komoot/photon/PhotonDoc.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ public PhotonDoc postcode(String postcode) {
return this;
}

public String getUid() {
if (houseNumber == null)
public String getUid(int object_id) {
if (object_id <= 0)
return String.valueOf(placeId);

return String.valueOf(placeId) + "." + houseNumber;
return String.format("%d.%d", placeId, object_id);
}

public void copyName(Map<String, String> target, String target_field, String name_field) {
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/de/komoot/photon/Updater.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package de.komoot.photon;

/**
* @author felix
* Interface for classes accepting database updates.
*/
public interface Updater {
public void create(PhotonDoc doc);
void create(PhotonDoc doc, int object_id);

public void delete(Long id);
void delete(long doc_id, int object_id);

public void finish();
boolean exists(long doc_id, int object_id);

void finish();
}
11 changes: 5 additions & 6 deletions src/main/java/de/komoot/photon/elasticsearch/Importer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import java.io.IOException;

/**
* elasticsearch importer
*
* @author felix
* Elasticsearch importer
*/
@Slf4j
public class Importer implements de.komoot.photon.Importer {
Expand All @@ -31,12 +29,13 @@ public Importer(Client esClient, String[] languages, String[] extraTags) {
}

@Override
public void add(PhotonDoc doc) {
public void add(PhotonDoc doc, int object_id) {
String uid = doc.getUid(object_id);
try {
this.bulkRequest.add(this.esClient.prepareIndex(PhotonIndex.NAME, PhotonIndex.TYPE).
setSource(Utils.convert(doc, languages, extraTags)).setId(doc.getUid()));
setSource(Utils.convert(doc, languages, extraTags)).setId(uid));
} catch (IOException e) {
log.error("could not bulk add document " + doc.getUid(), e);
log.error("could not bulk add document " + uid, e);
return;
}
this.documentCount += 1;
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/de/komoot/photon/elasticsearch/Updater.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,28 @@ public void finish() {
}

@Override
public void create(PhotonDoc doc) {
public void create(PhotonDoc doc, int object_id) {
String uid = doc.getUid(object_id);
try {
bulkRequest.add(esClient.prepareIndex(PhotonIndex.NAME, PhotonIndex.TYPE).setSource(Utils.convert(doc, languages, extraTags)).setId(String.valueOf(doc.getPlaceId())));
bulkRequest.add(esClient.prepareIndex(PhotonIndex.NAME, PhotonIndex.TYPE).setSource(Utils.convert(doc, languages, extraTags)).setId(uid));
} catch (IOException e) {
log.error(String.format("creation of new doc [%s] failed", doc), e);
log.error(String.format("creation of new doc [%s] failed", uid), e);
}
}

public void delete(Long id) {
this.bulkRequest.add(this.esClient.prepareDelete(PhotonIndex.NAME, PhotonIndex.TYPE, String.valueOf(id)));
public void delete(long doc_id, int object_id) {
this.bulkRequest.add(this.esClient.prepareDelete(PhotonIndex.NAME, PhotonIndex.TYPE, makeUid(doc_id, object_id)));
}

public boolean exists(long doc_id, int object_id) {
return esClient.prepareGet(PhotonIndex.NAME, PhotonIndex.TYPE, makeUid(doc_id, object_id)).execute().actionGet().isExists();
}

private String makeUid(Long doc_id, int object_id) {
if (object_id <= 0) {
return String.valueOf(doc_id);
}
return String.format("%d.%d", doc_id, object_id);
}

private void updateDocuments() {
Expand Down
44 changes: 24 additions & 20 deletions src/main/java/de/komoot/photon/nominatim/ImportThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
import de.komoot.photon.PhotonDoc;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
class ImportThread {
private static final int PROGRESS_INTERVAL = 50000;
private static final PhotonDoc FINAL_DOCUMENT = new PhotonDoc(0, null, 0, null, null);
private final BlockingQueue<PhotonDoc> documents = new LinkedBlockingDeque<>(20);
private static final NominatimResult FINAL_DOCUMENT = new NominatimResult(new PhotonDoc(0, null, 0, null, null));
private final BlockingQueue<NominatimResult> documents = new LinkedBlockingDeque<>(20);
private final AtomicLong counter = new AtomicLong();
private final Importer importer;
private final Thread thread;
Expand All @@ -31,22 +32,22 @@ public ImportThread(Importer importer) {
* @param docs Fully filled nominatim document.
*/
public void addDocument(NominatimResult docs) {
for (PhotonDoc doc : docs.getDocsWithHousenumber()) {
while (true) {
try {
documents.put(doc);
break;
} catch (InterruptedException e) {
log.warn("Thread interrupted while placing document in queue.");
// Restore interrupted state.
Thread.currentThread().interrupt();
}
}
if (counter.incrementAndGet() % PROGRESS_INTERVAL == 0) {
final double documentsPerSecond = 1000d * counter.longValue() / (System.currentTimeMillis() - startMillis);
log.info(String.format("imported %d documents [%.1f/second]", counter.longValue(), documentsPerSecond));
assert docs != null;
while (true) {
try {
documents.put(docs);
break;
} catch (InterruptedException e) {
log.warn("Thread interrupted while placing document in queue.");
// Restore interrupted state.
Thread.currentThread().interrupt();
}
}

if (counter.incrementAndGet() % PROGRESS_INTERVAL == 0) {
final double documentsPerSecond = 1000d * counter.longValue() / (System.currentTimeMillis() - startMillis);
log.info(String.format("imported %d documents [%.1f/second]", counter.longValue(), documentsPerSecond));
}
}

/**
Expand Down Expand Up @@ -74,12 +75,15 @@ private class ImportRunnable implements Runnable {
@Override
public void run() {
while (true) {
PhotonDoc doc;
try {
doc = documents.take();
if (doc == FINAL_DOCUMENT)
NominatimResult docs = documents.take();
if (docs == FINAL_DOCUMENT) {
break;
importer.add(doc);
}
int object_id = 0;
for (PhotonDoc doc : docs.getDocsWithHousenumber()) {
importer.add(doc, object_id++);
}
} catch (InterruptedException e) {
log.info("interrupted exception ", e);
// Restore interrupted state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void setImporter(Importer importer) {
}

public List<PhotonDoc> getByPlaceId(long placeId) {
List<NominatimResult> result = template.query(SELECT_COLS_PLACEX + " FROM placex WHERE place_id = ?",
List<NominatimResult> result = template.query(SELECT_COLS_PLACEX + " FROM placex WHERE place_id = ? and indexed_status = 0",
placeRowMapper, placeId);
if (result.size() == 0)
return null;
Expand All @@ -181,7 +181,7 @@ public List<PhotonDoc> getByPlaceId(long placeId) {

public List<PhotonDoc> getInterpolationsByPlaceId(long placeId) {
List<NominatimResult> result = template.query(selectOsmlineSql
+ " FROM location_property_osmline WHERE place_id = ?",
+ " FROM location_property_osmline WHERE place_id = ? and indexed_status = 0",
osmlineRowMapper, placeId);
if (result.size() == 0)
return null;
Expand Down
24 changes: 15 additions & 9 deletions src/main/java/de/komoot/photon/nominatim/NominatimResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.vividsolutions.jts.geom.Point;
import com.vividsolutions.jts.linearref.LengthIndexedLine;
import de.komoot.photon.PhotonDoc;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.regex.Pattern;
Expand All @@ -13,6 +14,7 @@
* A Nominatim result consisting of the basic PhotonDoc for the object
* and a map of attached house numbers together with their respective positions.
*/
@Slf4j
class NominatimResult {
private PhotonDoc doc;
private Map<String, Point> housenumbers;
Expand Down Expand Up @@ -144,20 +146,24 @@ public void addHouseNumbersFromInterpolation(long first, long last, String inter
* @param geom Geometry of the interpolation line.
*/
public void addHouseNumbersFromInterpolation(long first, long last, long step, Geometry geom) {
if (last <= first || (last - first) > 1000)
if (last < first || (last - first) > 1000)
return;

if (housenumbers == null)
housenumbers = new HashMap<>();

LengthIndexedLine line = new LengthIndexedLine(geom);
double si = line.getStartIndex();
double ei = line.getEndIndex();
double lstep = (ei - si) / (double) (last - first);

GeometryFactory fac = geom.getFactory();
for (long num = 1; first + num <= last; num += step) {
housenumbers.put(String.valueOf(num + first), fac.createPoint(line.extractPoint(si + lstep * num)));
if (last == first) {
housenumbers.put(String.valueOf(first), geom.getCentroid());
} else {
LengthIndexedLine line = new LengthIndexedLine(geom);
double si = line.getStartIndex();
double ei = line.getEndIndex();
double lstep = (ei - si) / (double) (last - first);

GeometryFactory fac = geom.getFactory();
for (long num = 0; first + num <= last; num += step) {
housenumbers.put(String.valueOf(num + first), fac.createPoint(line.extractPoint(si + lstep * num)));
}
}
}
}
87 changes: 55 additions & 32 deletions src/main/java/de/komoot/photon/nominatim/NominatimUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.jdbc.core.JdbcTemplate;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -94,22 +95,29 @@ private void update_from_placex() {
int deletedPlaces = 0;
for (UpdateRow place : getPlaces("placex")) {
long placeId = place.getPlaceId();
int object_id = -1;
boolean check_for_multidoc = true;

if (!place.isToDelete()) {
final List<PhotonDoc> updatedDocs = exporter.getByPlaceId(placeId);
if (updatedDocs != null && !updatedDocs.isEmpty() && updatedDocs.get(0).isUsefulForIndex()) {
check_for_multidoc = updatedDocs.get(0).getRankAddress() == 30;
++updatedPlaces;
for (PhotonDoc updatedDoc : updatedDocs) {
updater.create(updatedDoc, ++object_id);
}
}
}

// Always delete to catch some corner cases around places with exploded housenumbers.
updater.delete(placeId);

if (place.isToDelete()) {
deletedPlaces++;
continue;
if (object_id < 0) {
++deletedPlaces;
updater.delete(placeId, 0);
object_id = 0;
}

final List<PhotonDoc> updatedDocs = exporter.getByPlaceId(placeId);
if (updatedDocs != null) {
updatedPlaces++;
for (PhotonDoc updatedDoc : updatedDocs) {
if (updatedDoc.isUsefulForIndex()) {
updater.create(updatedDoc);
}
if (check_for_multidoc) {
while (updater.exists(placeId, ++object_id)) {
updater.delete(placeId, object_id);
}
}
}
Expand All @@ -126,41 +134,56 @@ private void update_from_interpolations() {
LOGGER.info("Starting interpolations");
int updatedInterpolations = 0;
int deletedInterpolations = 0;
int interpolationDocuments = 0;
for (UpdateRow place : getPlaces("location_property_osmline")) {
long placeId = place.getPlaceId();
int object_id = -1;

if (!place.isToDelete()) {
final List<PhotonDoc> updatedDocs = exporter.getInterpolationsByPlaceId(placeId);
if (updatedDocs != null) {
++updatedInterpolations;
for (PhotonDoc updatedDoc : updatedDocs) {
updater.create(updatedDoc, ++object_id);
}
}
}

updater.delete(placeId);
if (place.isToDelete()) {
deletedInterpolations++;
continue;
if (object_id < 0) {
++deletedInterpolations;
}

final List<PhotonDoc> updatedDocs = exporter.getInterpolationsByPlaceId(place.getPlaceId());
if (updatedDocs != null) {
updatedInterpolations++;
for (PhotonDoc updatedDoc : updatedDocs) {
updater.create(updatedDoc);
interpolationDocuments++;
}
while (updater.exists(placeId, ++object_id)) {
updater.delete(placeId, object_id);
}
}
LOGGER.info(String.format("%d interpolations created or updated, %d deleted, %d documents added or updated", updatedInterpolations,
deletedInterpolations, interpolationDocuments));

LOGGER.info(String.format("%d interpolations created or updated, %d deleted",
updatedInterpolations, deletedInterpolations));
}

private List<UpdateRow> getPlaces(String table) {
List<UpdateRow> results = template.query(exporter.getDataAdaptor().deleteReturning(
"DELETE FROM photon_updates WHERE rel = ?", "place_id, operation, indexed_date"),
(rs, rowNum) -> {
boolean isDelete = "DELETE".equals(rs.getString("operation"));
return new UpdateRow(rs.getLong("place_id"), isDelete, rs.getDate("indexed_date"));
}, new Object[]{table});

results.sort(Comparator.comparing(UpdateRow::getUpdateDate));
return new UpdateRow(rs.getLong("place_id"), isDelete, rs.getTimestamp("indexed_date"));
}, table);

// For each place only keep the newest item.
// Order doesn't really matter because updates of each place are independent now.
results.sort(Comparator.comparing(UpdateRow::getPlaceId).thenComparing(
Comparator.comparing(UpdateRow::getUpdateDate).reversed()));

ArrayList<UpdateRow> todo = new ArrayList<>();
long prev_id = -1;
for (UpdateRow row: results) {
if (row.getPlaceId() != prev_id) {
prev_id = row.getPlaceId();
todo.add(row);
}
}

return results;
return todo;
}


Expand Down
4 changes: 2 additions & 2 deletions src/test/java/de/komoot/photon/ApiIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class ApiIntegrationTest extends ESBaseTester {
public void setUp() throws Exception {
setUpES();
Importer instance = makeImporter();
instance.add(createDoc(13.38886, 52.51704, 1000, 1000, "place", "city").importance(0.6));
instance.add(createDoc(13.39026, 52.54714, 1001, 1001, "place", "town").importance(0.3));
instance.add(createDoc(13.38886, 52.51704, 1000, 1000, "place", "city").importance(0.6), 0);
instance.add(createDoc(13.39026, 52.54714, 1001, 1001, "place", "town").importance(0.3), 0);
instance.finish();
refresh();
}
Expand Down
Loading