WebFlux 核心组件
注解式编程模型
WebFlux 支持与 Spring MVC 类似的注解式编程,使用 @Controller 和 @RestController。
基本控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @RestController @RequestMapping("/api/users") public class UserController { private final UserService userService; public UserController(UserService userService) { this.userService = userService; } @GetMapping public Flux<User> getAllUsers() { return userService.findAll(); } @GetMapping("/{id}") public Mono<User> getUserById(@PathVariable String id) { return userService.findById(id); } @PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<User> createUser(@RequestBody User user) { return userService.save(user); } @PutMapping("/{id}") public Mono<User> updateUser(@PathVariable String id, @RequestBody User user) { return userService.update(id, user); } @DeleteMapping("/{id}") @ResponseStatus(HttpStatus.NO_CONTENT) public Mono<Void> deleteUser(@PathVariable String id) { return userService.deleteById(id); } }
|
响应式返回类型
| 返回类型 |
说明 |
Mono<T> |
返回单个对象 |
Flux<T> |
返回多个对象(流式) |
Mono<Void> |
无返回值 |
Mono<ResponseEntity<T>> |
自定义响应 |
WebClient - 响应式 HTTP 客户端
WebClient 是 WebFlux 提供的非阻塞 HTTP 客户端,用于替代 RestTemplate。
创建 WebClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| WebClient client = WebClient.create();
WebClient client = WebClient.create("http://localhost:8080");
WebClient client = WebClient.builder() .baseUrl("http://localhost:8080") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer token") .filter(ExchangeFilterFunction.ofRequestProcessor( request -> { log.info("Request: {} {}", request.method(), request.url()); return Mono.just(request); } )) .build();
|
GET 请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Mono<User> user = webClient.get() .uri("/api/users/{id}", userId) .retrieve() .bodyToMono(User.class);
Flux<User> users = webClient.get() .uri("/api/users") .retrieve() .bodyToFlux(User.class);
Flux<User> users = webClient.get() .uri(uriBuilder -> uriBuilder .path("/api/users") .queryParam("page", 0) .queryParam("size", 10) .build()) .retrieve() .bodyToFlux(User.class);
|
POST 请求
1 2 3 4 5 6 7 8 9 10 11 12 13
| Mono<User> createdUser = webClient.post() .uri("/api/users") .contentType(MediaType.APPLICATION_JSON) .bodyValue(newUser) .retrieve() .bodyToMono(User.class);
Mono<User> createdUser = webClient.post() .uri("/api/users") .body(userMono, User.class) .retrieve() .bodyToMono(User.class);
|
错误处理
1 2 3 4 5 6 7 8
| Mono<User> user = webClient.get() .uri("/api/users/{id}", userId) .retrieve() .onStatus(HttpStatusCode::is4xxClientError, response -> Mono.error(new NotFoundException("User not found"))) .onStatus(HttpStatusCode::is5xxServerError, response -> Mono.error(new ServerException("Server error"))) .bodyToMono(User.class);
|
使用 exchange() 获取完整响应
1 2 3 4 5 6 7 8 9
| Mono<ResponseEntity<User>> response = webClient.get() .uri("/api/users/{id}", userId) .exchangeToMono(clientResponse -> { if (clientResponse.statusCode().is2xxSuccessful()) { return clientResponse.toEntity(User.class); } else { return clientResponse.createException().flatMap(Mono::error); } });
|
ServerRequest 和 ServerResponse
这两个类用于函数式端点编程。
ServerRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| String id = request.pathVariable("id");
Optional<String> name = request.queryParam("name");
String contentType = request.headers().contentType() .map(MediaType::toString) .orElse("unknown");
Mono<User> user = request.bodyToMono(User.class); Flux<User> users = request.bodyToFlux(User.class);
|
ServerResponse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .bodyValue(user);
ServerResponse.ok().body(userMono, User.class);
ServerResponse.ok().body(userFlux, User.class);
ServerResponse.created(URI.create("/api/users/" + id)) .bodyValue(user);
ServerResponse.noContent().build();
ServerResponse.notFound().build(); ServerResponse.badRequest().bodyValue("Invalid request");
|
异常处理
使用 @ExceptionHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(NotFoundException.class) @ResponseStatus(HttpStatus.NOT_FOUND) public Mono<ErrorResponse> handleNotFound(NotFoundException ex) { return Mono.just(new ErrorResponse(ex.getMessage())); } @ExceptionHandler(Exception.class) @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) public Mono<ErrorResponse> handleException(Exception ex) { return Mono.just(new ErrorResponse("Internal server error")); } }
|
使用 WebExceptionHandler
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component @Order(-2) public class GlobalWebExceptionHandler implements WebExceptionHandler { @Override public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) { if (ex instanceof NotFoundException) { exchange.getResponse().setStatusCode(HttpStatus.NOT_FOUND); } return Mono.error(ex); } }
|
过滤器(WebFilter)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component public class LoggingFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { long startTime = System.currentTimeMillis(); return chain.filter(exchange) .doFinally(signalType -> { long duration = System.currentTimeMillis() - startTime; log.info("Request {} {} completed in {}ms", exchange.getRequest().getMethod(), exchange.getRequest().getPath(), duration); }); } }
|
相关链接
- 上一篇:Reactor核心API
- 下一篇:函数式端点