实战案例

实战案例

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
webflux-demo/
├── src/main/java/com/example/demo/
│ ├── DemoApplication.java
│ ├── config/
│ │ └── RouterConfig.java
│ ├── controller/
│ │ └── UserController.java
│ ├── handler/
│ │ └── UserHandler.java
│ ├── model/
│ │ └── User.java
│ ├── repository/
│ │ └── UserRepository.java
│ └── service/
│ └── UserService.java
├── src/main/resources/
│ └── application.yml
└── pom.xml

Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
</parent>

<groupId>com.example</groupId>
<artifactId>webflux-demo</artifactId>
<version>1.0.0</version>

<dependencies>
<!-- WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- R2DBC (响应式数据库) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!-- H2 数据库 (开发测试用) -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<scope>runtime</scope>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.example.demo.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

import java.time.LocalDateTime;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Table("users")
public class User {

@Id
private Long id;
private String name;
private String email;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.example.demo.repository;

import com.example.demo.model.User;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

Mono<User> findByEmail(String email);

Flux<User> findByNameContaining(String name);

@Query("SELECT * FROM users WHERE created_at > :since")
Flux<User> findRecentUsers(LocalDateTime since);
}

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.example.demo.service;

import com.example.demo.model.User;
import com.example.demo.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;

@Service
@RequiredArgsConstructor
public class UserService {

private final UserRepository userRepository;

public Flux<User> findAll() {
return userRepository.findAll();
}

public Mono<User> findById(Long id) {
return userRepository.findById(id);
}

public Mono<User> findByEmail(String email) {
return userRepository.findByEmail(email);
}

public Mono<User> save(User user) {
user.setCreatedAt(LocalDateTime.now());
user.setUpdatedAt(LocalDateTime.now());
return userRepository.save(user);
}

public Mono<User> update(Long id, User user) {
return userRepository.findById(id)
.flatMap(existingUser -> {
existingUser.setName(user.getName());
existingUser.setEmail(user.getEmail());
existingUser.setUpdatedAt(LocalDateTime.now());
return userRepository.save(existingUser);
});
}

public Mono<Void> deleteById(Long id) {
return userRepository.deleteById(id);
}
}

Controller(注解式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.example.demo.controller;

import com.example.demo.model.User;
import com.example.demo.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {

private final UserService userService;

@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}

@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable Long id) {
return userService.findById(id);
}

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody User user) {
return userService.save(user);
}

@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable Long id, @RequestBody User user) {
return userService.update(id, user);
}

@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteUser(@PathVariable Long id) {
return userService.deleteById(id);
}

// SSE 流式响应示例
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.findAll()
.delayElements(Duration.ofSeconds(1));
}
}

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
spring:
r2dbc:
url: r2dbc:h2:mem:///testdb
username: sa
password:

sql:
init:
mode: always
schema-locations: classpath:schema.sql

logging:
level:
org.springframework.r2dbc: DEBUG

数据库初始化脚本

1
2
3
4
5
6
7
8
9
10
11
-- schema.sql
CREATE TABLE IF NOT EXISTS users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO users (name, email) VALUES ('张三', 'zhangsan@example.com');
INSERT INTO users (name, email) VALUES ('李四', 'lisi@example.com');

WebClient 调用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example.demo.client;

import com.example.demo.model.User;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class UserClient {

private final WebClient webClient;

public UserClient(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("http://localhost:8080/api")
.build();
}

public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
}

public Mono<User> getUserById(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class);
}

public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.example.demo;

import com.example.demo.model.User;
import com.example.demo.service.UserService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.test.StepVerifier;

@SpringBootTest
@AutoConfigureWebTestClient
class UserControllerTest {

@Autowired
private WebTestClient webTestClient;

@Autowired
private UserService userService;

@Test
void getAllUsers() {
webTestClient.get()
.uri("/api/users")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(2);
}

@Test
void getUserById() {
webTestClient.get()
.uri("/api/users/1")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath("$.name").isEqualTo("张三");
}

@Test
void createUser() {
User newUser = new User();
newUser.setName("王五");
newUser.setEmail("wangwu@example.com");

webTestClient.post()
.uri("/api/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(newUser)
.exchange()
.expectStatus().isCreated()
.expectBody()
.jsonPath("$.name").isEqualTo("王五");
}

@Test
void testServiceWithStepVerifier() {
StepVerifier.create(userService.findAll())
.expectNextCount(2)
.verifyComplete();
}
}

性能优化建议

  1. 合理使用调度器:I/O 操作使用 boundedElastic,CPU 密集型使用 parallel
  2. 避免阻塞调用:确保整个调用链都是非阻塞的
  3. 使用连接池:配置合适的数据库连接池大小
  4. 启用压缩:对响应数据启用 Gzip 压缩
  5. 监控指标:集成 Micrometer 监控响应时间和吞吐量

相关链接

  • 上一篇:函数式端点
  • 返回:Webflux简介

实战案例
https://zmmmmy.github.io/2026/01/10/实战案例/
作者
ZhiMy
发布于
2026年1月10日
许可协议