Flux是管道不是容器

Flux 是管道,不是容器

常见误解

很多人初学 Reactor 时会有这样的误解:

1
2
3
// 错误理解:Flux 就像 List,是装数据的容器
List<String> list = Arrays.asList("a", "b", "c"); // 3个元素,确定的
Flux<String> flux = Flux.just("a", "b", "c"); // 也是3个元素,确定的?

这种理解在简单场景下没问题,但会导致无法理解复杂场景。


正确理解:Flux 是数据管道

管道 vs 容器

概念 容器(List) 管道(Flux)
数据存储 数据存在内存中 数据流经管道
数据时机 创建时就有 订阅后才开始流动
数据来源 已知、固定 可以是任何源头
数据数量 确定的 可能不确定
生命周期 一直存在 流完就结束

形象比喻

1
2
3
4
5
6
7
8
9
10
11
12
13
容器(List)像一个水桶:
┌─────────┐
│ 水水水 │ ← 水已经在桶里了
│ 水水水 │
└─────────┘

管道(Flux)像一根水管:
┌─────────┐
水源 ════════════════│ 水管 │════════════════▶ 出水口
(上游) └─────────┘ (下游)

水流经这里
但不存储在这里

实际案例:调用大模型 API

场景描述

调用 OpenAI/Claude 等大模型 API,返回流式响应:

1
2
3
4
5
6
7
8
9
10
Flux<String> response = webClient.post()
.uri("https://api.openai.com/v1/chat/completions")
.header("Authorization", "Bearer " + apiKey)
.bodyValue(Map.of(
"model", "gpt-4",
"messages", List.of(Map.of("role", "user", "content", "你好")),
"stream", true
))
.retrieve()
.bodyToFlux(String.class);

问题:Flux 里有多少数据?

答案:不知道,也不需要知道!

1
2
3
4
5
6
// 这个 Flux 创建时,大模型还没开始回复
// 数据量完全取决于大模型要说多少话
Flux<String> response = callLLM("请介绍一下自己");

// 可能返回 10 个 chunk,也可能返回 100 个
// Flux 不关心,它只负责把数据从上游传到下游

数据流动过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
┌─────────────────────────────────────────────────────────────────┐
│ 时间线 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ T0: 创建 Flux │
│ Flux<String> flux = callLLM(prompt); │
│ 此时:管道已建立,但没有数据流动 │
│ │
│ T1: 订阅 Flux │
│ flux.subscribe(chunk -> print(chunk)); │
│ 此时:向上游发送请求,开始等待数据 │
│ │
│ T2: 大模型开始回复 │
│ 上游发送 onNext("你") │
│ 下游收到 "你",打印 │
│ │
│ T3: 大模型继续回复 │
│ 上游发送 onNext("好") │
│ 下游收到 "好",打印 │
│ │
│ T4: 大模型继续回复 │
│ 上游发送 onNext(",我是AI助手") │
│ 下游收到,打印 │
│ │
│ T5: 大模型回复完毕 │
│ 上游发送 onComplete() │
│ 下游知道流结束了 │
│ │
└─────────────────────────────────────────────────────────────────┘

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestParam String prompt) {

// 1. 创建 Flux(此时没有数据)
Flux<String> response = webClient.post()
.uri("/v1/chat/completions")
.bodyValue(buildRequest(prompt))
.retrieve()
.bodyToFlux(ChatResponse.class)
.map(r -> r.getChoices().get(0).getDelta().getContent())
.filter(Objects::nonNull);

// 2. 返回 Flux(Spring WebFlux 会自动订阅并流式返回给前端)
return response;

// 3. 前端会逐字收到大模型的回复
}

Flux 的数据来源

Flux 的数据可以来自各种源头:

1. 静态数据(确定的)

1
2
3
4
// 数据在创建时就确定了
Flux.just(1, 2, 3);
Flux.fromIterable(list);
Flux.fromArray(array);

2. 生成规则(按规则产生)

1
2
3
// 按规则生成,数量可能无限
Flux.range(1, 100); // 1到100
Flux.interval(Duration.ofSeconds(1)); // 每秒一个,无限

3. 外部数据源(不确定的)

1
2
3
4
5
6
7
8
9
// 数据来自外部,数量不确定
Flux<String> httpResponse = webClient.get().retrieve().bodyToFlux(String.class);
Flux<Message> kafkaMessages = kafkaReceiver.receive();
Flux<String> websocketMessages = session.receive().map(WebSocketMessage::getPayloadAsText);
Flux<String> fileLines = Flux.using(
() -> Files.lines(path),
Flux::fromStream,
Stream::close
);

4. 手动推送(Sinks)

1
2
3
4
5
// 你自己决定什么时候发送什么数据
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
sink.tryEmitNext("消息1");
sink.tryEmitNext("消息2");
Flux<String> flux = sink.asFlux();

Flux 怎么知道数据结束?

三种终止信号

1
2
3
4
5
flux.subscribe(
data -> { }, // onNext: 收到数据
error -> { }, // onError: 发生错误,流终止
() -> { } // onComplete: 正常完成,流终止
);

谁发送终止信号?

上游(生产者)决定何时结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 示例1:静态数据 - 发完就结束
Flux.just(1, 2, 3);
// 内部:onNext(1), onNext(2), onNext(3), onComplete()

// 示例2:HTTP 响应 - 服务端关闭连接时结束
webClient.get().retrieve().bodyToFlux(String.class);
// 内部:onNext(chunk1), onNext(chunk2), ..., onComplete()

// 示例3:无限流 - 永不结束(除非出错或取消)
Flux.interval(Duration.ofSeconds(1));
// 内部:onNext(0), onNext(1), onNext(2), ... (永远继续)

// 示例4:手动控制 - 你决定何时结束
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.tryEmitNext("a");
sink.tryEmitNext("b");
sink.tryEmitComplete(); // 你决定结束

对比:什么时候用 Flux,什么时候用 Sinks

你是消费者 → 用 Flux

1
2
3
4
5
6
7
8
9
// 场景:调用别人的 API,接收数据
Flux<String> response = callExternalAPI();
response.subscribe(data -> process(data));

// 场景:从数据库查询
Flux<User> users = userRepository.findAll();

// 场景:读取文件
Flux<String> lines = readFileLines(path);

特点:数据由别人产生,你只需要接收和处理。

你是生产者 → 用 Sinks

1
2
3
4
5
6
7
8
9
10
11
12
13
// 场景:WebSocket 服务端,主动推送消息
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 在某个事件触发时发送数据
@EventListener
public void onUserLogin(UserLoginEvent event) {
sink.tryEmitNext("用户 " + event.getUsername() + " 登录了");
}

// 客户端订阅这个流
public Flux<String> getNotifications() {
return sink.asFlux();
}

特点:数据由你产生,你需要主动推送。

对比表

场景 角色 使用 数据来源
调用 HTTP API 消费者 Flux 服务端返回
查询数据库 消费者 Flux 数据库返回
读取消息队列 消费者 Flux 队列推送
WebSocket 推送 生产者 Sinks 你主动发送
事件广播 生产者 Sinks 你主动发送
封装回调 API 生产者 Sinks 回调触发时发送

深入理解:懒加载与订阅

Flux 是懒加载的

1
2
3
4
5
6
7
8
9
10
11
12
// 这行代码执行时,并没有真正调用 API
Flux<String> flux = webClient.get().uri("/api").retrieve().bodyToFlux(String.class);
System.out.println("Flux 创建完成");

// 只有订阅时,才真正发起请求
flux.subscribe(data -> System.out.println("收到: " + data));
System.out.println("订阅完成");

// 输出顺序:
// Flux 创建完成
// 订阅完成
// 收到: xxx(异步,稍后输出)

每次订阅都是独立的

1
2
3
4
5
6
7
8
9
Flux<String> flux = webClient.get().uri("/api/random").retrieve().bodyToFlux(String.class);

// 第一次订阅 - 发起第一次请求
flux.subscribe(data -> System.out.println("订阅1: " + data));

// 第二次订阅 - 发起第二次请求(独立的)
flux.subscribe(data -> System.out.println("订阅2: " + data));

// 两次订阅可能收到不同的数据!

冷流 vs 热流

1
2
3
4
5
6
7
8
9
10
11
12
13
// 冷流:每个订阅者都从头开始
Flux<Integer> cold = Flux.range(1, 3);
cold.subscribe(i -> System.out.println("A: " + i)); // A: 1, 2, 3
cold.subscribe(i -> System.out.println("B: " + i)); // B: 1, 2, 3

// 热流:订阅者共享数据流
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> hot = sink.asFlux();

hot.subscribe(i -> System.out.println("A: " + i));
sink.tryEmitNext(1); // A: 1
hot.subscribe(i -> System.out.println("B: " + i));
sink.tryEmitNext(2); // A: 2, B: 2(B 没收到 1)

实战:大模型流式调用完整示例

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@RestController
@RequestMapping("/api/chat")
public class ChatController {

private final WebClient webClient;

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestParam String prompt) {
return webClient.post()
.uri("https://api.openai.com/v1/chat/completions")
.header("Authorization", "Bearer " + apiKey)
.bodyValue(Map.of(
"model", "gpt-4",
"messages", List.of(Map.of("role", "user", "content", prompt)),
"stream", true
))
.retrieve()
.bodyToFlux(String.class)
.filter(line -> !line.equals("[DONE]"))
.map(this::extractContent)
.filter(Objects::nonNull)
.doOnNext(chunk -> log.debug("发送: {}", chunk))
.doOnComplete(() -> log.info("流式响应完成"))
.doOnError(e -> log.error("流式响应错误", e));
}

private String extractContent(String line) {
// 解析 SSE 数据,提取内容
// data: {"choices":[{"delta":{"content":"你好"}}]}
try {
JsonNode node = objectMapper.readTree(line.substring(6));
return node.at("/choices/0/delta/content").asText(null);
} catch (Exception e) {
return null;
}
}
}

前端接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 使用 EventSource 接收 SSE
const eventSource = new EventSource('/api/chat?prompt=你好');

eventSource.onmessage = (event) => {
// 逐字显示
document.getElementById('response').textContent += event.data;
};

eventSource.onerror = () => {
eventSource.close();
};

// 或者使用 fetch
const response = await fetch('/api/chat?prompt=你好');
const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
document.getElementById('response').textContent += chunk;
}

总结

概念 说明
Flux 是管道 数据流经管道,不存储在管道中
数据何时确定 取决于上游,可能创建时确定,也可能运行时才知道
数据数量 由上游决定,下游不需要提前知道
流何时结束 上游发送 onComplete()onError()
懒加载 创建 Flux 不执行,订阅时才执行
Sinks 的作用 让你成为上游,主动推送数据

核心理解

  • Flux 不是”装了多少数据”,而是”数据从哪来、往哪去”
  • 你不需要知道 Flux 里有多少数据,只需要处理流过来的每一个数据
  • 这就是响应式编程的核心思想:对数据流做出响应

相关链接

  • Sinks 详解
  • Project Reactor 实现
  • WebFlux - Reactor核心API

Flux是管道不是容器
https://zmmmmy.github.io/2026/01/10/Flux是管道不是容器/
作者
ZhiMy
发布于
2026年1月10日
许可协议