Skip to content

Commit

Permalink
#18 - Refactor: Support Ebean 9.3.1 with ElasticSearch only support (…
Browse files Browse the repository at this point in the history
…comes with move to Java8)
  • Loading branch information
rbygrave committed Nov 25, 2016
1 parent 880374b commit 4e67f4e
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 172 deletions.
21 changes: 7 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,19 @@

<groupId>org.avaje.ebean</groupId>
<artifactId>ebean-elastic</artifactId>
<version>1.4.2-SNAPSHOT</version>
<version>1.5.1-SNAPSHOT</version>

<parent>
<groupId>org.avaje</groupId>
<artifactId>oss-parent</artifactId>
<version>1.1</version>
<artifactId>java8-parent</artifactId>
<version>1.2</version>
</parent>

<scm>
<developerConnection>scm:git:[email protected]:ebean-orm/ebean-elastic.git</developerConnection>
<tag>HEAD</tag>
</scm>

<properties>
<java.version>1.6</java.version>
</properties>

<dependencies>

<dependency>
Expand All @@ -32,15 +28,15 @@

<!-- Http to ElasticSearch -->
<dependency>
<groupId>com.squareup.okhttp</groupId>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>2.4.0</version>
<version>3.4.2</version>
</dependency>

<dependency>
<groupId>org.avaje.ebean</groupId>
<artifactId>ebean</artifactId>
<version>9.2.1</version>
<version>9.3.1-SNAPSHOT</version>
</dependency>

<!-- Test Dependencies -->
Expand Down Expand Up @@ -85,10 +81,7 @@
<extensions>true</extensions>
<configuration>
<tiles>
<tile>org.avaje.tile:java-compile:1.1</tile>
<tile>org.avaje.tile:dependency-tree:1.1</tile>
<tile>org.avaje.tile:pygments-doclet:1.1</tile>
<tile>org.avaje.ebean.tile:enhancement:1.1</tile>
<tile>org.avaje.ebean.tile:enhancement:1.2</tile>
</tiles>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ private void writeBulkHeader(JsonGenerator gen, Object idValue, String event) th
gen.writeStartObject();
gen.writeFieldName(event);
gen.writeStartObject();
gen.writeStringField("_id", idValue.toString());
if (idValue != null) {
// use elasticsearch generated id value
gen.writeStringField("_id", idValue.toString());
}
gen.writeStringField("_type", indexType);
gen.writeStringField("_index", indexName);
gen.writeEndObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import com.avaje.ebean.PagedList;
import com.avaje.ebean.PersistenceIOException;
import com.avaje.ebean.Query;
import com.avaje.ebean.QueryEachConsumer;
import com.avaje.ebean.QueryEachWhileConsumer;
import com.avaje.ebean.config.DocStoreConfig;
import com.avaje.ebean.plugin.BeanType;
import com.avaje.ebean.plugin.SpiServer;
Expand All @@ -24,6 +22,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* ElasticSearch based document store.
Expand All @@ -42,7 +42,7 @@ public class ElasticDocumentStore implements DocumentStore {

private final EIndexService indexService;

public ElasticDocumentStore(SpiServer server, ElasticUpdateProcessor updateProcessor, IndexMessageSender sender, JsonFactory jsonFactory) {
ElasticDocumentStore(SpiServer server, ElasticUpdateProcessor updateProcessor, IndexMessageSender sender, JsonFactory jsonFactory) {
this.server = server;
this.updateProcessor = updateProcessor;
this.queryService = new EQueryService(server, jsonFactory, sender);
Expand Down Expand Up @@ -155,26 +155,23 @@ private <T> void indexByQuery(final BeanType<T> desc, Query<T> query, final DocS

desc.docStore().applyPath(query);
query.setLazyLoadBatchSize(100);
query.findEach(new QueryEachConsumer<T>() {
@Override
public void accept(T bean) {
Object idValue = desc.getBeanId(bean);
try {
queryUpdate.store(idValue, bean);
} catch (Exception e) {
throw new PersistenceIOException("Error performing query update to doc store", e);
}
query.findEach(bean -> {
Object idValue = desc.getBeanId(bean);
try {
queryUpdate.store(idValue, bean);
} catch (Exception e) {
throw new PersistenceIOException("Error performing query update to doc store", e);
}
});
}

@Override
public <T> void findEach(DocQueryRequest<T> request, QueryEachConsumer<T> consumer) {
public <T> void findEach(DocQueryRequest<T> request, Consumer<T> consumer) {
queryService.findEach(request, consumer);
}

@Override
public <T> void findEachWhile(DocQueryRequest<T> request, QueryEachWhileConsumer<T> consumer) {
public <T> void findEachWhile(DocQueryRequest<T> request, Predicate<T> consumer) {
queryService.findEachWhile(request, consumer);
}

Expand All @@ -193,7 +190,7 @@ public <T> T find(DocQueryRequest<T> request) {
return queryService.findById(request);
}

public void onStartup() {
void onStartup() {

try {
DocStoreConfig docStoreConfig = server.getServerConfig().getDocStoreConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import com.avaje.ebean.plugin.BeanType;
import com.avaje.ebean.plugin.SpiServer;
import com.avaje.ebeanservice.docstore.api.DocStoreQueryUpdate;
import com.avaje.ebeanservice.docstore.api.DocStoreTransaction;
import com.avaje.ebeanservice.docstore.api.DocStoreUpdate;
import com.avaje.ebeanservice.docstore.api.DocStoreUpdateProcessor;
import com.avaje.ebeanservice.docstore.api.DocStoreUpdates;
import com.avaje.ebeanservice.elastic.bulk.BulkSender;
import com.avaje.ebeanservice.elastic.bulk.BulkTransaction;
import com.avaje.ebeanservice.elastic.bulk.BulkUpdate;
import com.avaje.ebeanservice.elastic.support.IndexMessageSender;
import com.avaje.ebeanservice.elastic.support.IndexQueueWriter;
Expand All @@ -19,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.persistence.PersistenceException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -47,6 +50,38 @@ public ElasticUpdateProcessor(SpiServer server, IndexQueueWriter queueWriter, Js
this.bulkSender = new BulkSender(jsonFactory, JsonConfig.Include.NON_EMPTY, defaultObjectMapper, messageSender);
}

@Override
public DocStoreTransaction createTransaction(int batchSize) {
try {
return new BulkTransaction(createBulkUpdate(batchSize));
} catch (IOException e) {
throw new PersistenceException("Error creating bulk transaction", e);
}
}

@Override
public void commit(DocStoreTransaction docStoreTxn) {
docStoreTxn.flush();
queue(docStoreTxn.queue());
}

private void queue(final DocStoreUpdates changesToQueue) {
if (changesToQueue != null) {
server.getBackgroundExecutor().execute(new Runnable() {
@Override
public void run() {
try {
logger.debug("queue wait for changes...");
Thread.sleep(1000);
process(changesToQueue, 0);
} catch (Exception e) {
logger.error("Error processing queued changes ", e);
}
}
});
}
}

/**
* Initialise communication with the queue.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public Map<String, Object> sendBulk(BulkBuffer buffer) throws IOException {
* Parse the returned JSON response into a Map.
*/
private Map<String, Object> parseBulkResponse(String response) throws IOException {

return EJson.parseObject(response);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.avaje.ebeanservice.elastic.bulk;

import com.avaje.ebeanservice.docstore.api.DocStoreTransaction;
import com.avaje.ebeanservice.docstore.api.DocStoreUpdateContext;
import com.avaje.ebeanservice.docstore.api.DocStoreUpdates;

public class BulkTransaction implements DocStoreTransaction {

private final BulkUpdate bulkUpdate;

private DocStoreUpdates queueUpdates;

public BulkTransaction(BulkUpdate bulkUpdate) {
this.bulkUpdate = bulkUpdate;
}

@Override
public DocStoreUpdateContext obtain() {
return bulkUpdate.obtain();
}

@Override
public DocStoreUpdates queue() {
if (queueUpdates == null) {
queueUpdates = new DocStoreUpdates();
}
return queueUpdates;
}

@Override
public void flush() {
bulkUpdate.flush();
}
}
33 changes: 21 additions & 12 deletions src/main/java/com/avaje/ebeanservice/elastic/bulk/BulkUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.avaje.ebeanservice.docstore.api.DocStoreUpdate;

import javax.persistence.PersistenceException;
import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -36,24 +37,32 @@ public void send(DocStoreUpdate event) throws IOException {
* This automatically manages the bulk buffer batch size and flushing.
* </p>
*/
public BulkBuffer obtain() throws IOException {
if (currentBuffer == null) {
return newBuffer();
public BulkBuffer obtain() {
try {
if (currentBuffer == null) {
return newBuffer();
}
if (++count > batchSize) {
flush();
return newBuffer();
}
return currentBuffer;
} catch (IOException e) {
throw new PersistenceException("Error obtaining a buffer for Bulk updates", e);
}
if (++count > batchSize) {
flush();
return newBuffer();
}
return currentBuffer;
}

/**
* Flush the current buffer sending the Bulk API request to ElasticSearch.
*/
public void flush() throws IOException {

if (currentBuffer != null) {
collectErrors(bulkSender.sendBulk(currentBuffer));
public void flush() {

try {
if (currentBuffer != null) {
collectErrors(bulkSender.sendBulk(currentBuffer));
}
} catch (IOException e) {
throw new PersistenceException("Error send Bulk updates", e);
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/avaje/ebeanservice/elastic/query/EQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ public class EQuery<T> {

protected final SpiQuery<T> query;

protected final BeanType<T> beanType;
final BeanType<T> beanType;

protected final JsonContext jsonContext;
private final JsonContext jsonContext;

protected final JsonReadOptions jsonOptions;
private final JsonReadOptions jsonOptions;

public EQuery(SpiQuery<T> query, JsonContext jsonContext, JsonReadOptions jsonOptions) {
EQuery(SpiQuery<T> query, JsonContext jsonContext, JsonReadOptions jsonOptions) {
this.query = query;
this.beanType = query.getBeanDescriptor();
this.jsonContext = jsonContext;
this.jsonOptions = jsonOptions;
}

public EQuery(BeanType<T> beanType, JsonContext jsonContext, JsonReadOptions options) {
EQuery(BeanType<T> beanType, JsonContext jsonContext, JsonReadOptions options) {
this.query = null;
this.beanType = beanType;
this.jsonContext = jsonContext;
Expand All @@ -38,20 +38,20 @@ public EQuery(BeanType<T> beanType, JsonContext jsonContext, JsonReadOptions opt
/**
* Create a bean parser for the given json.
*/
protected BeanSearchParser<T> createParser(JsonParser json) {
JsonBeanReader reader = createReader(json);
BeanSearchParser<T> createParser(JsonParser json) {
JsonBeanReader<T> reader = createReader(json);
return createParser(json, reader);
}

/**
* Create a bean reader for the given json.
*/
public JsonBeanReader<T> createReader(JsonParser json) {
JsonBeanReader<T> createReader(JsonParser json) {
return jsonContext.createBeanReader(beanType, json, jsonOptions);
}

private BeanSearchParser<T> createParser(JsonParser json, JsonBeanReader<T> reader) {
return new BeanSearchParser<T>(json, beanType, reader, query.getLazyLoadMany());
return new BeanSearchParser<>(json, beanType, reader, query.getLazyLoadMany());
}

}
Loading

0 comments on commit 4e67f4e

Please sign in to comment.