This article targeted an early version of Spring Boot 2.0, some content requires a fresh when you are using the latest Spring Boot.
Reactive or Reactive Streams is a hot topic in these days, you can see it in blog entries, presentations, or some online course.
What is Reactive Streams? From the official website of Reactive Streams:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Currently, the JVM specification is completed, it includes a Java API(four simple interface), a textual Specification, a TCK and implementation examples. Check Reactive Streams for JVM for more details.
Reactor and RxJava2 have implemented this specification, and the upcoming Java 9 also adopted it in the new Flow API.
The Spring 5 embraces Reactive Streams. For Spring developers, it brings a complete new programming model. In this post, we will try to cover all reactive features in the Spring projects.
- Spring core framework added a new
spring-webflux
module, and provided built-in reactive programming support via Reactor and RxJava. - Spring Security 5 also added reactive feature.
- In Spring Data umbrella projects, a new
ReactiveSortingRepository
interface is added in Spring Data Commons. Redis, Mongo, Cassandra subprojects firstly got reactive supports. Unluckily due to the original JDBC is designated for blocking access, Spring Data JPA can not benefit from this feature. - Spring Session also began to add reactive features, an reactive variant for its
SessionRepository
is included in the latest 2.0.0.M3.
NOTE: At the moment I was writing this post, some Spring projects are still under active development, I will update the content and the sample codes against the final release version when they are ready. Please start the Github sample repository to get update in future.
- Create a Webflux application
- Spring Boot
- Reactive Data Operations
- Security for Webflux
- RouterFunction
- Client
- Test
- Kotlin
- Sample codes
- References
An example exceeds thousands of words. Let's begin to write some codes and enjoy the reactive programming brought by Spring 5.
As an example, I will reuse the same concept in my former Spring Boot sample codes which is a simple blog application.
In the following steps we will start with creating RESTful APIs for Post
.
Before writing some real codes, make sure you have installed the essential software:
- Oracle Java 8, https://java.oracle.com
- Apache Maven, https://maven.apache.org
- Gradle, http://www.gradle.org
- Your favorite IDE, including :
- NetBeans IDE
- Eclipse IDE (or base on Eclipse, eg. Spring ToolSuite is highly recommended)
- Intellij IDEA
- etc
NOTE: Do not forget to add path which includes java
and mvn
command into your system environment variable PATH .
Execute the following command to create a general web application from Maven archetype.
$ mvn archetype:generate -DgroupId=com.example
-DartifactId=demo
-DarchetypeArtifactId=maven-archetype-webapp
-DinteractiveMode=false
You can import the generated codes into your IDEs for further development.
Open pom.xml in your IDE editor, add some modifications:
- Add
spring-boot-starter-parent
as parent POM to manage the versions of all required dependencies for this project. - Add
spring-webflux
,jackson-databind
,reactor-core
as dependencies to get Spring Web Reactive support - Add
logback
as logging framework,jcl-over-slf4j
is a bridge for Spring jcl and slf4j. - Add Lombok to erase the tedious getters, setters, etc for a simple POJO class, check the Lombok project to get more information about Lombok, follow the official installation guide to get Lombok support in your IDE.
- You have to add spring milestone repositories in
repositories
andpluginRepositories
, because at the moment, they are still in active development, and not available in the official Maven public repository.
The final pom.xml looks like:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-reactive-sample-vanilla</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-reactive-sample-vanilla</name>
<description>Spring Webflux demo(without Spring Boot)</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
The project skeleton is ready, now let's add some codes to play reactive programming.
Create a new class named Post
, it includes three fields: id
, title
, content
.
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Post {
private Long id;
private String title;
private String content;
}
In the above codes, @Data
, @ToString
, @Builder
, @NoArgsConstructor
, @AllArgsConstructor
are from the Lombok project.
When you compile Post
, it will utilize Java compiler built-in Annotation Processing Tooling feature to add extra facilities into the final compiled classes, including:
- Getters and setters of the three fields, and overrides
equals
andhashCode
methods. - Overrides
toString
method. - A builder class for creating the Post instance more easily.
- A constructor with no arguments.
- A constructor with all fields as arguments.
Create a dummy repository named PostRepository
to retrieve posts from and save them back to a repository.
@Component
class PostRepository {
private static final Map<Long, Post> DATA = new HashMap<>();
private static long ID_COUNTER = 1L;
static {
Arrays.asList("First Post", "Second Post")
.stream()
.forEach((java.lang.String title) -> {
long id = ID_COUNTER++;
DATA.put(Long.valueOf(id), Post.builder().id(id).title(title).content("content of " + title).build());
}
);
}
Flux<Post> findAll() {
return Flux.fromIterable(DATA.values());
}
Mono<Post> findById(Long id) {
return Mono.just(DATA.get(id));
}
Mono<Post> createPost(Post post) {
long id = ID_COUNTER++;
post.setId(id);
DATA.put(id, post);
return Mono.just(post);
}
}
Currently we have not connect to any database, here we use a Map
backed data store instead. When we come to discuss the reactive features provided by Spring Data projects, we will replace it with a real Spring Data reactive implementation.
If you have used Spring Data before, you will find these APIs are every similiar with Repository
interface provided in Spring Data.
The main difference is in the current Repository class all methods return a Flux
or Mono
.
Flux
and Mono
are from Reactor, which powers the reactive support in Spring 5 by default.
Flux
means it could return lots of results in the stream.Mono
means it could return 0 to 1 result.
Create a controller class named PostController
to expose RESTful PAIs for Post
entity.
@RestController
@RequestMapping(value = "/posts")
class PostController {
private final PostRepository posts;
public PostController(PostRepository posts) {
this.posts = posts;
}
@GetMapping(value = "")
public Flux<Post> all() {
return this.posts.findAll();
}
@GetMapping(value = "/{id}")
public Mono<Post> get(@PathVariable(value = "id") Long id) {
return this.posts.findById(id);
}
@PostMapping(value = "")
public Mono<Post> create(Post post) {
return this.posts.createPost(post);
}
}
Create a @Configuration
class, add an @EnableWebFlux
annotation to activiate webflux support in this application.
@Configuration
@ComponentScan
@EnableWebFlux
class WebConfig {
}
Now we almost have done the programming work, let's try to bootstrap the application.
According to the official documention, in WebFlux framework section, there are some options to bootsrap a reactive web application.
WebFlux can run on Servlet containers with support for the Servlet 3.1 Non-Blocking IO API as well as on other async runtimes such as Netty and Undertow.
Create a general main class to run the application grammatically.
ApplicationContext context = new AnnotationConfigApplicationContext(WebConfig.class, SecurityConfig.class);
HttpHandler handler = DispatcherHandler.toHttpHandler(context);
// Tomcat and Jetty (also see notes below)
ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(handler);
Tomcat tomcatServer = new Tomcat();
tomcatServer.setHostname(DEFAULT_HOST);
tomcatServer.setPort(DEFAULT_PORT);
Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));
Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
rootContext.addServletMapping("/", "httpHandlerServlet");
tomcatServer.start();
The above codes perform some tasks.
- Create a
HttpHandler
fromApplicationContext
. - Use
ServletHttpHandlerAdapter
to bridge the Servlet APIs to reactive basedHttpHandler
. - Start tomcat server.
Do not forget add the tomcat-embed-core
to project dependencies.
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
</dependency>
You can simply run this class in IDEs as others Java application projects.
If you want to package all dependencies into one jar and run the application in one line command java -jar filename
, maven-assembly-plugin can help this purpose.
Add maven-assembly-plugin
configuration into the pom.xml file.
<!-- Maven Assembly Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>com.example.demo.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
Eneter the project root folder, execute the following command:
mvn package
When it is done, switch to the target folder, besides the general jar, you will find an extra fat jar was generated, which filename is ended with jar-with-dependencies.jar.
spring-reactive-sample-vanilla-0.0.1-SNAPSHOT-jar-with-dependencies.jar
spring-reactive-sample-vanilla-0.0.1-SNAPSHOT.jar
Run the following command to run this application.
java -jar target/spring-reactive-sample-vanilla-0.0.1-SNAPSHOT-jar-with-dependencies.jar
When it is started, try to fetch posts.
#curl http://localhost:8080/posts
[{"id":1,"title":"First Post","content":"content of First Post"},{"id":2,"title":"Second Post","content":"content of Second Post"}]
To start a Jetty server, replace the bootstrap codes with the following:
ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(handler);
Server server = new Server(DEFAULT_PORT);
ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setErrorHandler(null);
contextHandler.setContextPath("");
contextHandler.addServlet(new ServletHolder(servlet), "/");
server.setHandler(contextHandler);
server.start();
server.join();
Replace tomcat-embed-core
with the following jetty related dependencies.
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
Similiarly, you can run the application directly in your IDEs.
Alternatively, you can run the application in Reactor Netty, or JBoss Undertow.
For Reactor Netty, replace the above bootstraping codes with:
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create(DEFAULT_HOST, DEFAULT_PORT).newHandler(adapter).block();
And add reactor-netty
in your project dependencies.
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
For Undertow, replace the bootstraping codes with:
UndertowHttpHandlerAdapter undertowAdapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(DEFAULT_PORT, DEFAULT_HOST).setHandler(undertowAdapter).build();
server.start();
And add undertow-core
in your project dependencies.
<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-core</artifactId>
</dependency>
If you are stick on traditional web applications, and want to package it into a war file and deploy it into an existing servlet container, Spring 5 provides a AbstractAnnotationConfigDispatcherHandlerInitializer
to archive this purpose. It is a standard Spring ApplicationInitializer
implementation which can be scanned by Spring container when servlet container starts up.
Replace the above bootstraping class with:
public class AppIntializer extends AbstractAnnotationConfigDispatcherHandlerInitializer {
@Override
protected Class<?>[] getConfigClasses() {
return new Class[]{
WebConfig.class,
SecurityConfig.class
};
}
}
Next change the project packaging from jar to war in pom.xml.
<packaging>war</packaging>
And add serlvet-api
to your project dependencies.
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
Now you can run this application on a IDE managed Servlet 3.1 Container directly.
Or package the project into a war format and deploy it into a servlet 3.1 based container(tomcat, jetty) manually.
Alternatively, if you want to run this application via mvn
command in the development stage. cargo-maven2-plugin
can archive this purpose.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.cargo</groupId>
<artifactId>cargo-maven2-plugin</artifactId>
<configuration>
<container>
<containerId>tomcat8x</containerId>
<type>embedded</type>
</container>
<configuration>
<properties>
<cargo.servlet.port>9000</cargo.servlet.port>
<cargo.logging>high</cargo.logging>
</properties>
</configuration>
</configuration>
</plugin>
Run the following command to package and deploy it into an embedded tomcat controlled by cargo.
mvn verify cargo:run
Currently Spring Boot 2.0 is still in active development. The final Spring Boot 2.0 will target the latest Spring technology stack, including Spring 5, Spring Security 5, Spring Session 2 etc.
Open browser and navigate to http://start.spring.io.
In the Spring Boot Initializer page.
- Select Spring Boot version as 2.0.0.M3 or 2.0.0.SNAPSHOT.
- In the dependencies box, type reactive, it will display all reactive options in a dropdown menu. Select Ractive Web to add
spring-webflux
into project dependencies. You can also add other items as you like, such as Reactive MongoDb, Reactive Redis etc. - Click Generate project button or hint ALT+NETER keys to generate a project skeleton as a zip file for downloading.
Download and extract it into your disc, import the source codes into your IDEs.
As you see, in the pom.xml, new Spring Boot strater spring-boot-starter-webflux
is added.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
And the spring-boot-maven-plugin
is added in the initial pom.xml.
Spring Boot starter spring-boot-starter-webflux
will handle the spring-webflux
related dependencies and enable webflux support automatically.
Compare to the former vanilla version,
- No need explicit
WebConfig
, Spring Boot configures it automatically. - The former bootstraping class or
ApplicationInitializer
is no use now, the Spring Boot built-in@SpringBootApplication
annotated class hands over the application bootstrap.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
By default, Spring Boot will use Reactor Netty to run a webflux application. No need extra configuration for it.
Starts up application via:
mvn spring-boot:run
If you want to use Apache Tomcat as target runtime environment, just exclude spring-boot-starter-reactor-netty
from spring-boot-starter-webflux
, and add spring-boot-starter-tomcat
into project dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
You can use Jetty to replace the default Reactor Netty.
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-webflux</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
Similiarly, you can use Undertow as target runtime.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
The next generation of Spring Data aslo adds Reactive Streams support.
At the moment, Data Redis, Data MongoDB and Data Cassandra will be the first-class citizen to get basic reactive support.
Spring Data Mongo provides reactive variants of MongoTemplate
and MongoRepository
, aka ReactiveMongoTemplate
and ReactiveMongoRepository
which have reactive capablities.
Add the following into project dependencies.
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
Create a @Configuration
class to configure Mongo and enable Reactive support.
@EnableReactiveMongoRepositories(basePackageClasses = {MongoConfig.class})
public class MongoConfig extends AbstractReactiveMongoConfiguration {
@Value("${mongo.uri}")
String mongoUri;
@Override
public MongoClient mongoClient() {
return MongoClients.create(mongoUri);
}
@Override
protected String getDatabaseName() {
return "blog";
}
}
Create a new Post
MongoDB document class.
@Document
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Post {
@Id
private String id;
private String title;
private String content;
}
@Document
declares it as a MongoDB document.@Id
indicates it is the identifier field ofPost
document.
Delcares a PostRepository
interface to extend Sprign Data MongoDB specific ReactiveMongoRepository
.
interface PostRepository extends ReactiveMongoRepository<Post, String> {
}
Configure MongoDB connection in the appliation.yml file.
spring:
data:
mongodb:
uri: mongodb://localhost:27017/blog
Before starting up your application, make sure there is a running MongoDB instance in your local system.
NOTE: If you have not installed it, go to Mongo download page and get a copy of MongoDB, and install it into your system.
Alternatively, if you are familiar with Docker, it is simple to start a MongoDB instance via Docker Compose file.
version: '3.3' # specify docker-compose version
# Define the services/containers to be run
services:
redis:
image: redis
ports:
- "6379:6379"
mongodb:
image: mongo
volumes:
- mongodata:/data/db
ports:
- "27017:27017"
command: --smallfiles --rest
# command: --smallfiles --rest --auth
volumes:
mongodata:
Execute the following command to start a Mongo instance in a Docker container.
docker-compose up mongodb
When the Mongo service is started, it is ready for bootstraping the application.
mvn spring-boot:run
If you are using Spring Boot, the configuration can be simplified. Just add spring-boot-starter-data-mongodb-reactive
into the project dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
No need extra configuration class, Spring Boot will enable reactive support for MongoDB in this project. ReactiveMongoTemplate
and ReactiveMongoRepository
will be configured automatically.
Spring Data Mongo supports data auditing as Spring Data JPA, it can set the current user and created/last modified timestamp to a field automatically.
Add EnableMongoAuditing
to application class to activiate auditing for MongoDB.
@EnableMongoAuditing
public class DemoApplication {}
In Post
document, add a new field createdDate
, annotated it with @CreatedDate
, it will fill the createdDate with current date when inserting it into MongoDB.
@CreatedDate
private LocalDateTime createdDate;
Add some test datas into MongoDB when it starts up.
@Component
@Slf4j
class DataInitializr implements CommandLineRunner {
private final PostRepository posts;
public DataInitializr(PostRepository posts) {
this.posts = posts;
}
@Override
public void run(String[] args) {
log.info("start data initialization ...");
this.posts
.deleteAll()
.thenMany(
Flux
.just("Post one", "Post two")
.flatMap(
title -> this.posts.save(Post.builder().title(title).content("content of " + title).build())
)
)
.log()
.subscribe(
null,
null,
() -> log.info("done initialization...")
);
}
}
Use a CommandLineRunner
to make sure the run
method is executed after the application is started.
Execute mvn spring-boot:run
to start up the application now, then we can test if the data is initialized successfully.
curl -v http://localhost:8080/posts
* timeout on name lookup is not supported
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /posts HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/json;charset=UTF-8
< Cache-Control: no-cache, no-store, max-age=0, must-revalidate
< Pragma: no-cache
< Expires: 0
< X-Content-Type-Options: nosniff
< X-Frame-Options: DENY
< X-XSS-Protection: 1 ; mode=block
<
[{"id":"599149d53c44062e08c58b86","title":"Post one","content":"content of Post one","createdDate":[2017,8,14,14,57,25,71000000]},{"id":"599149d53c44062e08c58b87","title":"Post two","content":"content of Post two","createdDate":[2017,8,14,14,57,25,173000000]}]* Connection #0 to host localhost left intact
As you see, the data is initialized and createdDate is inserted automatically.
Spring Data Redis provides a reactive variant of RedisConnectionFactory
aka ReactiveRedisConnectionFactory
which return a ReactiveConnection
.
Add the following into your project dependencies.
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
NOTE: You have to use lettuce
as redis driver to get reactive support in spring-data-redis
, and add commons-pool2
to support Redis connection pool.
Create a @Configuration
class to configure Mongo and enable Reactive support for Redis.
@EnableRedisRepositories
public class RedisConfig {
@Autowired
RedisConnectionFactory factory;
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory();
}
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory){
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory){
return new StringRedisTemplate(connectionFactory);
}
@PreDestroy
public void flushTestDb() {
factory.getConnection().flushDb();
}
}
LettuceConnectionFactory
implements RedisConnectionFactory
and ReactiveRedisConnectionFactory
interfaces, when a LettuceConnectionFactory
is declared, RedisConnectionFactory
and ReactiveRedisConnectionFactory
are also registered as beans.
In your beans, you can inject a ReactiveRedisConnectionFactory
and get a reactive connection.
@Inject ReactiveRedisConnectionFactory factory;
ReactiveRedisConnection conn = factory.getReactiveConnection();
ReactiveConnection
provides some reactive methods for redis operations.
For example, create a favorites list for posts.
conn.setCommands()
.sAdd(
ByteBuffer.wrap("users:user:favorites".getBytes()),
this.posts.findAll()
.stream()
.map(p -> p.getId().getBytes())
.map(ByteBuffer::wrap)
.collect(Collectors.toList())
)
.log()
.subscribe(null, null, ()-> log.info("added favirates..."));
And show my favorites in the controller.
@RestController()
@RequestMapping(value = "/favorites")
class FavoriteController {
private final ReactiveRedisConnectionFactory factory;
public FavoriteController(ReactiveRedisConnectionFactory factory) {
this.factory = factory;
}
@GetMapping("")
public Mono<List<String>> all() {
return this.factory.getReactiveConnection()
.setCommands()
.sMembers(ByteBuffer.wrap("users:user:favorites".getBytes()))
.map(FavoriteController::toString)
.collectList();
}
private static String toString(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return new String(bytes);
}
}
For Spring Boot applications, the configuration can be simplified. Just add spring-boot-starter-data-redis-reactive
into the project dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
Spring boot provides auto-configuration for redis, and registers ReactiveRedisConnectionFactory
for you automatically.
Declare Post
as a redis hash data, add @RedisHash("posts")
to Post
POJO.
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@RedisHash("posts")
class Post {
@Id
private String id;
private String title;
private String content;
}
Let's have a look at the PostRepository
.
interface PostRepository extends KeyValueRepository<Post, String> {
@Override
public List<Post> findAll();
}
KeyValueRepository
is from spring-data-keyvalue
, which is a generic Map based Repository implementation.
private void initPosts() {
this.posts.deleteAll();
Stream.of("Post one", "Post two").forEach(
title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build())
);
}
NOTE: Unlike Spring Data Mongo, Spring Data Redis does not provides a variant for RedisTemplate
and Repository
.
Spring Data Cassandra also embraces reactive support.
Firstly add the following dependencies into your project.
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
</dependency>
Create a @Configuration
class to configure Cassandra and enable reactive support.
@Configuration
@EnableReactiveCassandraRepositories(basePackageClasses = {CassandraConfig.class})
public class CassandraConfig extends AbstractReactiveCassandraConfiguration {
@Value("${cassandra.keyspace-name}")
String keySpace;
@Value("${cassandra.contact-points}")
String contactPoints;
@Override
protected List<CreateKeyspaceSpecification> getKeyspaceCreations() {
CreateKeyspaceSpecification specification = CreateKeyspaceSpecification.createKeyspace(keySpace)
.ifNotExists()
.with(KeyspaceOption.DURABLE_WRITES, true);
//.withNetworkReplication(DataCenterReplication.dcr("foo", 1), DataCenterReplication.dcr("bar", 2));
return Arrays.asList(specification);
}
@Override
protected List<DropKeyspaceSpecification> getKeyspaceDrops() {
return Arrays.asList(DropKeyspaceSpecification.dropKeyspace(keySpace));
}
@Override
protected String getKeyspaceName() {
return keySpace;
}
@Override
protected String getContactPoints() {
return contactPoints;
}
@Override
public SchemaAction getSchemaAction() {
return SchemaAction.RECREATE;
}
}
getKeyspaceCreations
configures how to create the keyspace when Cassandra is started, here we create the keyspace if it does not existed.
getSchemaAction
specifies the action of schema generation.
Next add Table
annotation to the Post
entity.
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table("posts")
class Post {
@PrimaryKey()
@Builder.Default
private String id = UUID.randomUUID().toString();
private String title;
private String content;
}
Add @PrimaryKey
on id
field, it indicates id
is the primary key of posts
table.
Unlike Mongo, in Cassandra, you have to fill the id
field manually before it is inserted.
Next change the former PostRepository
to the following:
interface PostRepository extends ReactiveCassandraRepository<Post, String>{}
Cassandra has a reactive variant for Repository
, as the above CassandraRepository
.
If you are using Spring Boot, just add spring-boot-starter-data-cassandra-reactive
into your project dependencies.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>
No need extra configuration, Spring Boot will configure Cassandra for you and registers related beans for you.
As former Mongo example, it is easy to erase the existing data and import some initial data when the application is started up.
public void init() {
log.info("start data initialization ...");
this.posts
.deleteAll()
.thenMany(
Flux
.just("Post one", "Post two")
.flatMap(
title -> this.posts.save(Post.builder().title(title).content("content of " + title).build())
)
)
.log()
.subscribe(
null,
null,
() -> log.info("done initialization...")
);
}
Aligned with the reactive feature introduced in Spring 5, Spring Security 5 added a new module named spring-secuirty-webflux
.
Add the following into your project dependencies.
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-webflux</artifactId>
</dependency>
Create a configuration class, add @EnableWebFluxSecurity
annotation to enable Spring security for Webflux.
@EnableWebFluxSecurity
class SecurityConfig {
@Bean
SecurityWebFilterChain springWebFilterChain(HttpSecurity http) throws Exception {
return http
.authorizeExchange()
.pathMatchers(HttpMethod.GET, "/posts/**").permitAll()
.pathMatchers(HttpMethod.DELETE, "/posts/**").hasRole("ADMIN")
//.pathMatchers("/users/{user}/**").access(this::currentUserMatchesPath)
.anyExchange().authenticated()
.and()
.build();
}
private Mono<AuthorizationDecision> currentUserMatchesPath(Mono<Authentication> authentication, AuthorizationContext context) {
return authentication
.map( a -> context.getVariables().get("user").equals(a.getName()))
.map( granted -> new AuthorizationDecision(granted));
}
@Bean
public MapUserDetailsRepository userDetailsRepository() {
UserDetails rob = User.withUsername("test").password("test123").roles("USER").build();
UserDetails admin = User.withUsername("admin").password("admin123").roles("USER","ADMIN").build();
return new MapUserDetailsRepository(rob, admin);
}
}
- Use
@EnableWebFluxSecurity
annotation to enable Security forspring-webflux
based application. SecurityWebFilterChain
bean is a must to configure the details of Spring Security.HttpSecurity
is fromspring-secuirty-webflux
, similar with the general version, but handleWebExhange
instead of Servlet basedWebRequest
.- A new
UserDetailsRepository
interface is introduced which is aligned with Reactor APIs. By default, an in-memoryMap
based implementationMapUserDetailsRepository
is provided, you can customsize yourself by implementing theUserDetailsRepository
interface.
Starts up the application and verify the Spring Security configuratoin work as expected.
mvn spring-boot:run
After it is started, try to add a new post without authentication:
#curl -v -X POST http://localhost:8080/posts -H "Content-Type:application/json" -d "{\"title\":\"My Post\",\"content\":\"content of My Post\"}"
Note: Unnecessary use of -X or --request, POST is already inferred.
* timeout on name lookup is not supported
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /posts HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 42
>
* upload completely sent off: 42 out of 42 bytes
< HTTP/1.1 401 Unauthorized
< WWW-Authenticate: Basic realm="Realm"
< Cache-Control: no-cache, no-store, max-age=0, must-revalidate
< Pragma: no-cache
< Expires: 0
< X-Content-Type-Options: nosniff
< X-Frame-Options: DENY
< X-XSS-Protection: 1 ; mode=block
< content-length: 0
<
* Connection #0 to host localhost left intact
The server side rejects the client request, and sends back a 401 error(401 Unauthorized).
Use the predefined user:password credentials to get authenticated and send the post request again.
curl -v -X POST http://localhost:8080/posts -u "user:password" -H "Content-Type:application/json" -d "{\"title\":\"My Post\",\"content\":\"content of My Post\"}"
Note: Unnecessary use of -X or --request, POST is already inferred.
* timeout on name lookup is not supported
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
* Server auth using Basic with user 'test'
> POST /posts HTTP/1.1
> Host: localhost:8080
> Authorization: Basic dGVzdDp0ZXN0MTIz
> User-Agent: curl/7.54.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 50
>
* upload completely sent off: 50 out of 50 bytes
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/json;charset=UTF-8
< Cache-Control: no-cache, no-store, max-age=0, must-revalidate
< Pragma: no-cache
< Expires: 0
< X-Content-Type-Options: nosniff
< X-Frame-Options: DENY
< X-XSS-Protection: 1 ; mode=block
< set-cookie: SESSION=b99124f7-c0a0-4507-b9be-34718af3d137; HTTPOnly
<
{"id":"59906f9d3c44060e044fb378","title":"My Post","content":"content of My Post","createdDate":[2017,8,13,23,26,21,392000000]}* Connection #0 to host localhost left intact
It is done secussfully, and returns the new created post.
For Spring Boot applciations, add it in the project dependencies aside with spring-boot-starter-security
.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-webflux</artifactId>
</dependency>
NOTE: Currently you have to add spring-security-webflux
explicitly, there is no specific starters for spring-security-webflux
.
Like traditional web mvc applications, you can use a @PreAuthorize("hasRole('ADMIN')")
annotation on your methods to prevent the execution of this method if the evaluation of the expression defined in the PreAuthorize
is false.
To enable the method level security, add an extra @EnableReactiveMethodSecurity
to your SecurityConfig class.
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
class SecurityConfig {
}
In your business codes, add @PreAuthorize("hasRole('ADMIN')")
annotation to your method.
@PreAuthorize("hasRole('ADMIN')")
Mono<Post> delete(Long id) {
Post deleted = data.get(id);
data.remove(id);
return Mono.just(deleted);
}
Spring Security provides a UserDetailsRepositoryResourceFactoryBean
which allow you load users from a properties file to create the UserDetailsRepository
for your applications.
@Bean
public UserDetailsRepositoryResourceFactoryBean userDetailsService() {
return UserDetailsRepositoryResourceFactoryBean
.fromResourceLocation("classpath:users.properties");
}
The contnet of users.properties is:
user=password,ROLE_USER
admin=password,ROLE_USER,ROLE_ADMIN
The key is username, the value is password, and it's roles.
As said before, you can easily implement your own UserDetailsRepository
.
Here let's use Mongo as backend store, create a User
document class which implements spring secuirty specific UserDetails
interface.
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Document
class User implements UserDetails {
@Id
private String id;
private String username;
private String password;
@Builder.Default()
private boolean active = true;
@Builder.Default()
private List<String> roles = new ArrayList<>();
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
return AuthorityUtils.createAuthorityList(roles.toArray(new String[roles.size()]));
}
@Override
public boolean isAccountNonExpired() {
return active;
}
@Override
public boolean isAccountNonLocked() {
return active;
}
@Override
public boolean isCredentialsNonExpired() {
return active;
}
@Override
public boolean isEnabled() {
return active;
}
}
Create a generic purpose Repository
for User
, named UserRepository
.
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Mono<User> findByUsername(String username);
}
Replace the UserDetailsRepository
bean declaration with the following, which connect to the real database.
@Bean
public UserDetailsRepository userDetailsRepository(UserRepository users) {
return (username) -> {
return users.findByUsername(username).cast(UserDetails.class);
};
}
spring-webflux
also provides DSL like syntax to define route rules for requests.
To enable the functional routes definition support, declare a RouterFunction
bean to replace the traditional Controller
class.
@Bean
public RouterFunction<ServerResponse> routes(PostHandler postHandler) {
return route(GET("/posts"), postHandler::all)
.andRoute(POST("/posts").and(contentType(APPLICATION_JSON)), postHandler::create)
.andRoute(GET("/posts/{id}"), postHandler::get);
}
A helper class RouterFunctions
can help create a route rule easily.
route
accepts a PredicateFunction
and HandlerFunction
, there is PredicateFunctions
which can help you build the predicaiton condition of the incomming request.
I would like extract the HandlerFunction
into a standalone class, here we put all handlers into a PostHandler
class.
@Component
public class PostHandler {
private final PostRepository posts;
public PostHandler(PostRepository posts) {
this.posts = posts;
}
public Mono<ServerResponse> all(ServerRequest req) {
return ServerResponse.ok().body(this.posts.findAll(), Post.class);
}
public Mono<ServerResponse> create(ServerRequest req) {
return req.body(BodyExtractors.toMono(Post.class))
.flatMap(post -> this.posts.save(post))
.flatMap(p -> ServerResponse.created(URI.create("/posts/" + p.getId())).build());
}
public Mono<ServerResponse> get(ServerRequest req) {
return this.posts.findById(Long.valueOf(req.pathVariable("id")))
.flatMap(post -> ServerResponse.ok().body(Mono.just(post), Post.class))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
A HandlerFunction
accepts a ServerRequest
as arguments and return a Mono<ServerResponse>
, it is easy to control the response defails, such as response body, status, etc.
Similiar with RestTemplate
and AsyncRestTemplate
, Spring 5 provides a WebClient
to shake hands with reactive driven APIs.
WebClient client = WebClient.create("http://localhost:8080");
client
.get()
.uri("/posts")
.exchange()
.flatMapMany(res -> res.bodyToFlux(Post.class))
.log()
.subscribe(post -> System.out.println("post: " + post));
Spring 5 provides a WebTestClient
to help you test reactive server side APIs. It is similar with WebClient
, but provides more facilities to interact with server in a test environment.
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
ApplicationContext context;
WebTestClient client;
@Before
public void setup() {
client = WebTestClient
.bindToApplicationContext(context)
.configureClient()
.baseUrl("http://localhost:8080/")
.build();
}
//...
}
Here we use bindToApplicationContext
to create a WebTestClient
for the whole application, Spring provides some other options, such as bindToController
, bindToRouterFunction
etc, which allow you test paritial APIs.
@Test
public void getAllPostsShouldBeOkWithAuthetication() {
client
.get()
.uri("/posts/")
.exchange()
.expectStatus().isOk();
}
WebTestClient
is more flexible than WebClient
, you can do some assertions directly, eg isOK()
in the above codes.
@Test
public void deletePostsNotAllowedWhenIsNotAdmin() {
client
.mutate().filter(basicAuthentication("test", "password")).build()
.delete()
.uri("/posts/1")
.exchange()
.expectStatus().isEqualTo(HttpStatus.FORBIDDEN);
}
WebTestClient
can add some mutation in the web exchange process, as shown in the above codes, adding HTTP Basic header and trying to get authentication.
Kotlin becomes more and more popular, especially Google announced it was the first-class citizen in Android development.
Spring 5 also brings Kotlin on board, and add a few improvements to integrate with Spring projects.
BeanDefinitionDSL
allow you declare beans in a fluent DSL file instead of XML configuration or Java annotation configuration.
The following is an exmaple of beans declaration which utilizes the Kotlin specific BeanDefinitionDSL
.
fun beans() = beans {
bean<ResourcePropertySource> {
ResourcePropertySource(EncodedResource(ClassPathResource("application.properties")))
}
bean {
PostHandler(it.ref())
}
bean {
Routes(it.ref())
}
bean<WebHandler>("webHandler") {
RouterFunctions.toWebHandler(
it.ref<Routes>().router(),
HandlerStrategies.builder().build()
//HandlerStrategies.builder().viewResolver(it.ref()).build()
)
}
bean("messageSource") {
ReloadableResourceBundleMessageSource().apply {
setBasename("messages")
setDefaultEncoding("UTF-8")
}
}
bean {
DataInitializr(it.ref(), it.ref())
}
bean {
PostRepository(it.ref())
}
bean { ReactiveMongoRepositoryFactory(it.ref()) }
bean {
ReactiveMongoTemplate(
SimpleReactiveMongoDatabaseFactory(
//ConnectionString(it.env.getProperty("mongo.uri"))
ConnectionString("mongodb://localhost:27017/blog")
)
)
}
bean<WebFilter>("springSecurityFilterChain") {
WebFilterChainFilter(Flux.just(it.ref()))
}
bean<SecurityWebFilterChain> {
it.ref<HttpSecurity>().authorizeExchange()
.pathMatchers(HttpMethod.GET, "/api/posts/**").permitAll()
.pathMatchers(HttpMethod.DELETE, "/api/posts/**").hasRole("ADMIN")
//.pathMatchers("/users/{user}/**").access(this::currentUserMatchesPath)
.anyExchange().authenticated()
.and()
.build()
}
bean<HttpSecurity>(scope = BeanDefinitionDsl.Scope.PROTOTYPE) {
HttpSecurity.http().apply {
httpBasic()
authenticationManager(UserDetailsRepositoryAuthenticationManager(it.ref()))
securityContextRepository(WebSessionSecurityContextRepository())
}
}
bean {
UserDetailsRepository { username -> it.ref<UserRepository>()
.findByUsername(username)
.map { (_, username, password, active, roles) ->
org.springframework.security.core.userdetails.User
.withUsername(username)
.password(password)
.accountExpired(!active)
.accountLocked(!active)
.credentialsExpired(!active)
.disabled(!active)
.authorities(roles.map(::SimpleGrantedAuthority).toList())
.build()
}
.cast(UserDetails::class.java)
}
}
bean {
UserRepository(it.ref())
}
profile("foo") {
bean<Foo>()
}
}
class Foo
RouterFunctionDSL
allow you write route rules in a more fluent style.
fun router() = router {
accept(MediaType.TEXT_HTML).nest {
GET("/") { ServerResponse.ok().render("index") }
GET("/sse") { ServerResponse.ok().render("sse") }
//GET("/users", postHandler::findAllView)
}
"/api".nest {
accept(MediaType.APPLICATION_JSON).nest {
GET("/posts", postHandler::all)
GET("/posts/{id}", postHandler::get)
}
accept(MediaType.TEXT_EVENT_STREAM).nest {
GET("/posts", postHandler::stream)
}
POST("/posts", postHandler::create)
PUT("/posts/{id}", postHandler::update)
DELETE("/posts/{id}", postHandler::delete)
}
resources("/**", ClassPathResource("static/"))
}
Please check out the Source codes for the complete Kotlin application.
The following table lits all sample codes related to this post. The sample codes of this post is hosted on my Github account, welcome to star and fork it.
name | description |
---|---|
vanilla | The initial application, includes basic spring-webflux feature, use a main class to start up the application |
vanilla-jetty | Same as vanilla, but use Jetty as target runtime |
vanilla-reactor-netty | Same as vanilla, but use Reactor Netty as target runtime |
vanilla-reactor-netty | Same as vanilla, but use Undertow as target runtime |
java9 | Same as vanilla, Java 9 Flow API support is not ready in Spring 5.0.0.REALESE, planned in 5.0.1, see issue SPR-16052 and the original discussion on stackoverflow |
rxjava | Same as vanilla, but use Rxjava instead of Reactor |
rxjava2 | Same as vanilla, but use Rxjava2 instead of Reactor |
war | Replace the manual bootstrap class in vanilla with Spring ApplicationInitializer , it can be packaged as a war file to be deployed into an external servlet container. |
routes | Use RouterFunction instead of controller in vanilla |
register-bean | Programmatic approach to register all beans in ApplicatonContext at application bootstrap |
data-mongo | Demonstration of Spring Data Mongo reactive support |
data-redis | Demonstration of Spring Data Redis reactive support |
data-cassandra | Demonstration of Spring Data Cassandra reactive support |
data-couchbase | Demonstration of Spring Data Couchbase reactive support |
security | Based on vanilla, add secuirty for spring webflux support |
security-user-properties | Same as secuirty, but use users.properties to store users |
security-method | Replace URI based configuration with method level constraints |
security-data-mongo | Based on data-mongo and security, replace with dummy users in hard codes with Mongo driven store |
multipart | Mutipart request handling and file uploading |
multipart-data-mongo | (PENDING)Multipart and file uploading, but data in Mongo via Spring Data Mongo, waitng for Reactive support for GridFsTemplate |
mvc-thymeleaf | Traditinal web mvc application, use Thymeleaf specific Reactive view resolver to render view |
mvc-freemarker | Traditinal web mvc application, use freemarker as template engine, currently it does not have a reactive view resolver |
sse | Server Send Event and json stream example |
websocket | Reactive Websocket example |
boot | Switch to Spring Boot to get autoconfiguration of spring-webflux , added extra Spring Data Mongo, Spring Secuirty support |
boot-jetty | Same as boot, but use Jetty as target runtime |
boot-tomcat | Same as boot, but use Tomcat as target runtime |
boot-undertow | Same as boot, but use Undertow as target runtime |
boot-routes | Use RouterFunction instead of the general Controller in boot |
boot-freemarker | Same as mvc-freemarker, but based on Spring Boot |
groovy | Same features as boot, but written in groovy |
client | Demonstration of WebClient to shake hands with backend reactive APIs |
kotlin | Same features as boot, but written in kotlin |
kotlin-gradle | Use kotlin functional approach to declare beans and bootstrap the application programatically |
session | (WIP)More features will be added here |
-
Reactive Streams, official Reactive Streams website
-
Understanding Reactive types, Spring.IO
-
The WebFlux framework, Spring Framework Reference Documentation
-
Reactor Core 3.0 becomes a unified Reactive Foundation on Java 8, Spring.IO
-
Reactive Spring, Spring.IO
-
Three parts of Notes on Reactive Programming by Dave Syer:
-
Kotlin extensions for MongoOperations and ReactiveMongoOperations