From ecab5530f9cb36c0fb3e871a1a53ad58829a809a Mon Sep 17 00:00:00 2001 From: Marco Brandizi Date: Tue, 2 Jul 2024 20:01:41 +0100 Subject: [PATCH] Adding Neo4jReactorUtils. --- .../neo4j/utils/Neo4jReactorUtils.java | 77 +++++++++++++++++++ revision_history.md | 1 + 2 files changed, 78 insertions(+) create mode 100644 neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java diff --git a/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java b/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java new file mode 100644 index 0000000..9d311bb --- /dev/null +++ b/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java @@ -0,0 +1,77 @@ +package uk.ac.rothamsted.neo4j.utils; + +import java.util.function.Function; + +import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; +import org.neo4j.driver.reactivestreams.ReactiveTransactionContext; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Utilities to work with the Project Reactor integration into Neo4j. + * + * TODO: write tests and examples. + * + * @author Marco Brandizi + *
Date:
29 Jun 2024
+ * + */ +public class Neo4jReactorUtils +{ + /** + * Helper to process a Neo4j read query in a reactive style. + * + * In a nutshell, prepares a {@link Flux} that pushes an R object for each record + * returned by the query in the callback. Records are mapped to R by the recordMapper + * function. + * + * This is a template based on the approach described + * + * here + * here. + * + * @param callBack this is passed to + * {@link ReactiveSession#executeRead(org.neo4j.driver.reactivestreams.ReactiveTransactionCallback)} + * and it's where you should run your Cypher query and produce a {@link ReactiveResult}, from which + * we do downstream processing. Typically, this is done via {@link ReactiveTransactionContext#run(String)} + * and its variants (see the examples above) + * + * @param recordMapper We use {@link ReactiveResult#records()} to get the records that the callBack's + * returns and then we map them to R objects. The mapping is based on {@link Mono#flatMapMany(Function)}, + * so it's a flux-to-flux mapping (again, see the linked examples). + * + * @param neoDriver obviously, you need a Neo4j driver to talk to. + * + * @return a reactive {@link Flux} of objects, where each object correspond to a Cypher + * record, in the way explained above. + * + */ + public static Flux reactiveRead ( + Function> callBack, + Function recordMapper, + Driver neoDriver ) + { + return Flux.usingWhen ( + // The reactive session is generated when a subscriber comes... + Mono.fromSupplier ( () -> neoDriver.session ( ReactiveSession.class ) ), + // ...and it's used in the closure, to spawn a Flux of Rs + rsession -> + rsession.executeRead ( tx -> + // This yields a ReactiveResult + Mono.fromDirect ( callBack.apply ( tx ) ) + // which is mapped onto its records + .flatMapMany ( ReactiveResult::records ) + // and then records are mapped by our custom mapper, the result is Flux + .map ( recordMapper ) + ) // executeRead() + , // usingWhen(), closure publisher + ReactiveSession::close // usingWhen(), flux cleanup + ); // usingWhen () + } + +} diff --git a/revision_history.md b/revision_history.md index dd12362..79ca54c 100644 --- a/revision_history.md +++ b/revision_history.md @@ -5,6 +5,7 @@ ## 5.1.1-SNAPSHOT * `XNeo4jDriver` added. * `GenericNeo4jException` deprecated (see comments) +* `Neo4jReactorUtils` ## 5.1 * Code cleaning and improvement.