diff --git a/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java b/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java
new file mode 100644
index 0000000..9d311bb
--- /dev/null
+++ b/neo4j-utils/src/main/java/uk/ac/rothamsted/neo4j/utils/Neo4jReactorUtils.java
@@ -0,0 +1,77 @@
+package uk.ac.rothamsted.neo4j.utils;
+
+import java.util.function.Function;
+
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.reactivestreams.ReactiveResult;
+import org.neo4j.driver.reactivestreams.ReactiveSession;
+import org.neo4j.driver.reactivestreams.ReactiveTransactionContext;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Utilities to work with the Project Reactor integration into Neo4j.
+ *
+ * TODO: write tests and examples.
+ *
+ * @author Marco Brandizi
+ *
- Date:
- 29 Jun 2024
+ *
+ */
+public class Neo4jReactorUtils
+{
+ /**
+ * Helper to process a Neo4j read query in a reactive style.
+ *
+ * In a nutshell, prepares a {@link Flux} that pushes an R object for each record
+ * returned by the query in the callback. Records are mapped to R by the recordMapper
+ * function.
+ *
+ * This is a template based on the approach described
+ *
+ * here
+ * here>.
+ *
+ * @param callBack this is passed to
+ * {@link ReactiveSession#executeRead(org.neo4j.driver.reactivestreams.ReactiveTransactionCallback)}
+ * and it's where you should run your Cypher query and produce a {@link ReactiveResult}, from which
+ * we do downstream processing. Typically, this is done via {@link ReactiveTransactionContext#run(String)}
+ * and its variants (see the examples above)
+ *
+ * @param recordMapper We use {@link ReactiveResult#records()} to get the records that the callBack's
+ * returns and then we map them to R objects. The mapping is based on {@link Mono#flatMapMany(Function)},
+ * so it's a flux-to-flux mapping (again, see the linked examples).
+ *
+ * @param neoDriver obviously, you need a Neo4j driver to talk to.
+ *
+ * @return a reactive {@link Flux} of objects, where each object correspond to a Cypher
+ * record, in the way explained above.
+ *
+ */
+ public static Flux reactiveRead (
+ Function> callBack,
+ Function recordMapper,
+ Driver neoDriver )
+ {
+ return Flux.usingWhen (
+ // The reactive session is generated when a subscriber comes...
+ Mono.fromSupplier ( () -> neoDriver.session ( ReactiveSession.class ) ),
+ // ...and it's used in the closure, to spawn a Flux of Rs
+ rsession ->
+ rsession.executeRead ( tx ->
+ // This yields a ReactiveResult
+ Mono.fromDirect ( callBack.apply ( tx ) )
+ // which is mapped onto its records
+ .flatMapMany ( ReactiveResult::records )
+ // and then records are mapped by our custom mapper, the result is Flux
+ .map ( recordMapper )
+ ) // executeRead()
+ , // usingWhen(), closure publisher
+ ReactiveSession::close // usingWhen(), flux cleanup
+ ); // usingWhen ()
+ }
+
+}
diff --git a/revision_history.md b/revision_history.md
index dd12362..79ca54c 100644
--- a/revision_history.md
+++ b/revision_history.md
@@ -5,6 +5,7 @@
## 5.1.1-SNAPSHOT
* `XNeo4jDriver` added.
* `GenericNeo4jException` deprecated (see comments)
+* `Neo4jReactorUtils`
## 5.1
* Code cleaning and improvement.