Enterprise Java

Reactive Web Applications Using SpringWebFlux

1. Introduction to Reactive Programming

Reactive programming is a term coined for the applications that has the following characteristics:

  • Non-blocking applications
  • Event-driven and asynchronous
  • Require a small number of threads to scale vertically (i.e. within the JVM)

Just like object-oriented programming, functional programming, or procedural programming, reactive programming is just another programming paradigm. It makes our program: Responsive, Resilient, Elastic.

2. Reactive Programming in Spring

The Spring Framework uses Reactor internally for its own reactive support. Reactor is an implementation of Reactive Streams (publishers, introduced in Java9). Reactor has the following two datatypes:

  • Flux (it is a Stream which can emit 0 or more elements)
  • Mono (it is a Stream which can emit 0 or 1 element)

Spring exposes these types from its API, thereby making the application reactive.

In Spring 5, a new module has been introduced called WebFlux, which gives the support for creating the reactive web applications using: HTTP(REST) and web sockets.

Spring Web Flux supports the following two models:

  • Functional Model
  • Annotation Model

In this article we will explore the Functional Model.

Following table compares normal Spring with Web Flux:

Traditional Stack Reactive Stack
Spring Web MVC Spring WebFlux
Controller and Handler Mapping Router Functions
Servlet API HTTP/ Reactive Stream
Servlet Containers Any servlet container with support for Servlet 3.1+, Tomcat 8.x,Jetty, Netty, UnderTow

3. Use Case

A REST API using Spring Web Flux has to be created for an Employee Management system that will expose the CRUD on Employee.

Note: DAO layer of the Project is hard coded.

4. Software and Environment needed

  • Java: 1.8 or above
  • Maven: 3.3.9 or above
  • Eclipse Luna or above
  • Spring Boot: 2.0.0.M4
  • Spring Boot Starter WebFlux
  • Postman for Testing the Application

5. Flow of the Application

Spring5 WebFlux’s Functional model is an alternative to using Spring MVC style annotations. In Spring WebFlux Functional model, Routers and handler functions are used to create a MVC application.HTTP request is routed via router function (alternative to annotations like @RequestMapping) and request is handled via handler function (alternative to @Controller handler methods).

Each handler function will take ServerRequest (org.springframework.web.reactive.function.server.ServerRequest) as a parameter and as result will return the Mono<ServerResponse> or Flux<ServerResponse> (org.springframework.web.reactive.function.server.ServerResponse).

6. Code For the Use Case and Description

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.webflux</groupId>
	<artifactId>Demo_Spring_MVC_Web_Flux</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<repositories>
		<repository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</pluginRepository>
		<pluginRepository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</pluginRepository>
	</pluginRepositories>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.0.M4</version>
		<relativePath />
		<!-- lookup parent from repository -->
	</parent>
	<properties>
		<project.build.sourceEncoding>UTF-8
             </project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8
            </project.reporting.outputEncoding>
             <!-- Configuring Java 8 for the Project -->
		<java.version>1.8</java.version>
	</properties>
      <!--Excluding Embedded tomcat to make use of the Netty Server-->
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<exclusions>
			   <exclusion>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-starter-tomcat</artifactId>
			   </exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

EmployeeDAO.java

package com.webflux.dao;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
@Repository
public class EmployeeDAO {
	/**
	 * Map is used to Replace the Database 
	 * */
	static public Map mapOfEmloyeess = new LinkedHashMap();
	static int count=10004;
	static
	{
	   mapOfEmloyeess.put(10001, new Employee("Jack",10001,12345.6,1001));
	   mapOfEmloyeess.put(10002, new Employee("Justin",10002,12355.6,1002));
	   mapOfEmloyeess.put(10003, new Employee("Eric",10003,12445.6,1003));
	}

	/**
	 * Returns all the Existing Employees as Flux
	 * */
	public Flux getAllEmployee(){
		return Flux.fromStream(mapOfEmloyeess.values().stream());		
	}

	/**Get Employee details using EmployeeId .
	 * Returns a Mono response with Data if Employee is Found
	 * Else returns a null
	 * */
	public Mono getEmployeeDetailsById(int id){
		
		Monores = null;
		Employee emp =mapOfEmloyeess.get(id);
		if(emp!=null){
			res=Mono.just(emp);
		}
		return res;
	}

	/**Create Employee details.
	 * Returns a Mono response with auto-generated Id
	 * */
	public Mono addEmployee(Employee employee){
		count++;
		employee.setEmployeeId(count);
		mapOfEmloyeess.put(count, employee);
		return Mono.just(count);
	}

	/**Update the Employee details,
	 * Receives the Employee Object and returns the updated Details
	 * as Mono  
	 * */
	public Mono updateEmployee (Employee employee){
		mapOfEmloyeess.put(employee.getEmployeeId(), employee);
		return Mono.just(employee);
	}

	/**Delete the Employee details,
	 * Receives the EmployeeID and returns the deleted employee Details
	 * as Mono  
	 * */
	public Mono removeEmployee (int id){
		Employee emp= mapOfEmloyeess.remove(id);
		return Mono.just(emp);
	}	
}

It can be observed that all the methods of the EmployeeDAO are returning either Mono or Flux Response, thereby making the DAO Layer Reactive.

EmployeeHandler.java

package com.webflux.web.handler;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
import com.webflux.dao.EmployeeDAO;
@Controller
public class EmployeeHandler {

	@Autowired
	private EmployeeDAO employeeDAO;
	
	
	/**
	 * Receives a ServerRequest.
	 * Invokes the method getAllEmployee() from EmployeeDAO.
	 * Prepares a Mono and returns the same.
	 * */	
      public Mono getEmployeeDetails(ServerRequest request) {
		Flux  res=employeeDAO.getAllEmployee();		
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
		.body(res,Employee.class);
	}
	
	/**
	 * Receives a ServerRequest.
	 * Extracts the Path Variable (named id) from the Request.
	 * Invokes the method [getEmployeeDetailsById()] from EmployeeDAO.
	 * Verifies if the object returned in the previous step is null 
	 * then returns a Bad request with appropriate message.
	 * Else Returns the Mono with the Employee Data.
	 * */
	public Mono getEmployeeDetailByEmployeeId(ServerRequest request) {
		//Extracts the Path Variable id from the Request
		int id =Integer.parseInt(request.pathVariable("id"));
		Mono employee = employeeDAO.getEmployeeDetailsById(id);
		Mono res= null;
		if(employee==null){
			res=ServerResponse.badRequest().body
									(fromObject("Please give a valid employee Id"));
		}
		else{
			//Converting Mono of Mono type to Mono
			res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); 
		}
		return res;
	}

	/**
	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Invokes the method [addEmployee()] of the EmployeeDAO.
	 * Prepares a Mono and returns the same. 
	 * */
	public Mono addEmployee(ServerRequest request) {
		Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
            Mono mono= employeeDAO.addEmployee(requestBodyMono.block());
		//Converting Mono of Mono type to Mono
		Mono res= mono.flatMap(x->ServerResponse.ok().body
									(fromObject("Employee Created with Id"+x))); 
		return res;
	}

	
	/**
	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Finds the Employee and updates the details by invoking updateEmployee() of   
     * EmployeeDAO. 
     * Prepares a Mono and returns the same.
	 * */
	public Mono updateEmployee(ServerRequest request) {
		Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
		Employee employee =  requestBodyMono.block();
	      Mono employeeRet = employeeDAO.getEmployeeDetailsById(employee.getEmployeeId());
	      Mono res= null;
		if(employeeRet==null){
			res=ServerResponse.badRequest().body(fromObject
			                ("Please Give valid employee details to update"));
		}

		else{
			Mono emp= employeeDAO.updateEmployee(employee);
			
			//Converting Mono of Mono type to Mono
			res=emp.flatMap(x->ServerResponse.ok().body(fromObject(x)));
		}
		return res;				
	}

	/**
	 * Receives a ServerRequest.
	 * Makes use of BodyExtractors and Extracts the Employee Data as 
     * Mono from the ServerRequest.
	 * Finds the Employee and deletes the details by invoking removeEmployee() of   
     * EmployeeDAO. 
     * Prepares a Mono and returns the same.
	 * */
	public Mono deleteEmployee(ServerRequest request) {
		int myId = Integer.parseInt(request.pathVariable("id"));
		Mono res= null;
		if (employeeDAO.getEmployeeDetailsById(myId) == null) {
			res=ServerResponse.badRequest().body
                      (fromObject("Please Give valid employee details to delete"));
		}else{
		  Mono employee = employeeDAO.removeEmployee(myId);
		
              //Converting Mono of Mono type to Mono
		  res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); 
		  
		}
		return res;
	}
}

It can be observed that all the methods of the Handler are returning Mono<ServerResponse>, thereby making the Presentation Layer Reactive.

Note: Event Handler method should accept the ServerRequest and should return Mono<ServerResponse>

RouterConfiguration.java

package com.webflux.web.router.config;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.webflux.web.handler.EmployeeHandler;
@Configuration
/**
 * Router is configuration class.
 * It links the incoming requests with appropriate HTTP methods to the 
 * respective method of the EmployeeHandler.
 * Method references are used for the mapping.
 * */
public class RouterConfiguration{
    @Autowired
    EmployeeHandler employeeHandler;
    
    @Bean
    public RouterFunction monoRouterFunction() {
     RouterFunctionrouterFunction=  RouterFunctions.		
              route(GET("/emp/controller/getDetails").
              and(accept(MediaType.APPLICATION_JSON)),          
              employeeHandler::getEmployeeDetails)
             
			 .andRoute(GET("/emp/controller/getDetailsById/{id}")
             .and(accept(MediaType.APPLICATION_JSON)),            
              employeeHandler::getEmployeeDetailByEmployeeId)
    
             .andRoute(POST("/emp/controller/addEmp")
             .and(accept(MediaType.APPLICATION_JSON)), 
              employeeHandler::addEmployee)
    					
             .andRoute(PUT("/emp/controller/updateEmp")
             .and(accept(MediaType.APPLICATION_JSON)), 
             employeeHandler::updateEmployee)
    					
             .andRoute(DELETE("/emp/controller/deleteEmp/{id}")
             .and(accept(MediaType.APPLICATION_JSON)), 
             employeeHandler::deleteEmployee);
        return routerFunction;
    } 
  
}

ApplicationBootUp.java

package com.webflux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ApplicationBootUp {
	public static void main(String[] args) {
		SpringApplication.run(ApplicationBootUp.class);
	}

}

Inside application.properties only server port is mentioned: server.port=8090.

Application can be deployed using the command:  clean install spring-boot:run and tested using the postman client.

7. References:

8. Download the Eclipse Project

Download
You can download the full source code of this example here: SpringWebFlux

Mohpreet Singh

Mohpreet is Java developer and trainer. He is passionate in learning new technologies, having a detailed exposure on the related technology and publishing the related content.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button