Flux 是管道,不是容器
常见误解
很多人初学 Reactor 时会有这样的误解:
1 2 3
| List<String> list = Arrays.asList("a", "b", "c"); Flux<String> flux = Flux.just("a", "b", "c");
|
这种理解在简单场景下没问题,但会导致无法理解复杂场景。
正确理解:Flux 是数据管道
管道 vs 容器
| 概念 |
容器(List) |
管道(Flux) |
| 数据存储 |
数据存在内存中 |
数据流经管道 |
| 数据时机 |
创建时就有 |
订阅后才开始流动 |
| 数据来源 |
已知、固定 |
可以是任何源头 |
| 数据数量 |
确定的 |
可能不确定 |
| 生命周期 |
一直存在 |
流完就结束 |
形象比喻
1 2 3 4 5 6 7 8 9 10 11 12 13
| 容器(List)像一个水桶: ┌─────────┐ │ 水水水 │ ← 水已经在桶里了 │ 水水水 │ └─────────┘
管道(Flux)像一根水管: ┌─────────┐ 水源 ════════════════│ 水管 │════════════════▶ 出水口 (上游) └─────────┘ (下游) ↑ 水流经这里 但不存储在这里
|
实际案例:调用大模型 API
场景描述
调用 OpenAI/Claude 等大模型 API,返回流式响应:
1 2 3 4 5 6 7 8 9 10
| Flux<String> response = webClient.post() .uri("https://api.openai.com/v1/chat/completions") .header("Authorization", "Bearer " + apiKey) .bodyValue(Map.of( "model", "gpt-4", "messages", List.of(Map.of("role", "user", "content", "你好")), "stream", true )) .retrieve() .bodyToFlux(String.class);
|
问题:Flux 里有多少数据?
答案:不知道,也不需要知道!
1 2 3 4 5 6
|
Flux<String> response = callLLM("请介绍一下自己");
|
数据流动过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| ┌─────────────────────────────────────────────────────────────────┐ │ 时间线 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ T0: 创建 Flux │ │ Flux<String> flux = callLLM(prompt); │ │ 此时:管道已建立,但没有数据流动 │ │ │ │ T1: 订阅 Flux │ │ flux.subscribe(chunk -> print(chunk)); │ │ 此时:向上游发送请求,开始等待数据 │ │ │ │ T2: 大模型开始回复 │ │ 上游发送 onNext("你") │ │ 下游收到 "你",打印 │ │ │ │ T3: 大模型继续回复 │ │ 上游发送 onNext("好") │ │ 下游收到 "好",打印 │ │ │ │ T4: 大模型继续回复 │ │ 上游发送 onNext(",我是AI助手") │ │ 下游收到,打印 │ │ │ │ T5: 大模型回复完毕 │ │ 上游发送 onComplete() │ │ 下游知道流结束了 │ │ │ └─────────────────────────────────────────────────────────────────┘
|
代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> chat(@RequestParam String prompt) { Flux<String> response = webClient.post() .uri("/v1/chat/completions") .bodyValue(buildRequest(prompt)) .retrieve() .bodyToFlux(ChatResponse.class) .map(r -> r.getChoices().get(0).getDelta().getContent()) .filter(Objects::nonNull); return response; }
|
Flux 的数据来源
Flux 的数据可以来自各种源头:
1. 静态数据(确定的)
1 2 3 4
| Flux.just(1, 2, 3); Flux.fromIterable(list); Flux.fromArray(array);
|
2. 生成规则(按规则产生)
1 2 3
| Flux.range(1, 100); Flux.interval(Duration.ofSeconds(1));
|
3. 外部数据源(不确定的)
1 2 3 4 5 6 7 8 9
| Flux<String> httpResponse = webClient.get().retrieve().bodyToFlux(String.class); Flux<Message> kafkaMessages = kafkaReceiver.receive(); Flux<String> websocketMessages = session.receive().map(WebSocketMessage::getPayloadAsText); Flux<String> fileLines = Flux.using( () -> Files.lines(path), Flux::fromStream, Stream::close );
|
4. 手动推送(Sinks)
1 2 3 4 5
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(); sink.tryEmitNext("消息1"); sink.tryEmitNext("消息2"); Flux<String> flux = sink.asFlux();
|
Flux 怎么知道数据结束?
三种终止信号
1 2 3 4 5
| flux.subscribe( data -> { }, error -> { }, () -> { } );
|
谁发送终止信号?
上游(生产者)决定何时结束:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Flux.just(1, 2, 3);
webClient.get().retrieve().bodyToFlux(String.class);
Flux.interval(Duration.ofSeconds(1));
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer(); sink.tryEmitNext("a"); sink.tryEmitNext("b"); sink.tryEmitComplete();
|
对比:什么时候用 Flux,什么时候用 Sinks
你是消费者 → 用 Flux
1 2 3 4 5 6 7 8 9
| Flux<String> response = callExternalAPI(); response.subscribe(data -> process(data));
Flux<User> users = userRepository.findAll();
Flux<String> lines = readFileLines(path);
|
特点:数据由别人产生,你只需要接收和处理。
你是生产者 → 用 Sinks
1 2 3 4 5 6 7 8 9 10 11 12 13
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
@EventListener public void onUserLogin(UserLoginEvent event) { sink.tryEmitNext("用户 " + event.getUsername() + " 登录了"); }
public Flux<String> getNotifications() { return sink.asFlux(); }
|
特点:数据由你产生,你需要主动推送。
对比表
| 场景 |
角色 |
使用 |
数据来源 |
| 调用 HTTP API |
消费者 |
Flux |
服务端返回 |
| 查询数据库 |
消费者 |
Flux |
数据库返回 |
| 读取消息队列 |
消费者 |
Flux |
队列推送 |
| WebSocket 推送 |
生产者 |
Sinks |
你主动发送 |
| 事件广播 |
生产者 |
Sinks |
你主动发送 |
| 封装回调 API |
生产者 |
Sinks |
回调触发时发送 |
深入理解:懒加载与订阅
Flux 是懒加载的
1 2 3 4 5 6 7 8 9 10 11 12
| Flux<String> flux = webClient.get().uri("/api").retrieve().bodyToFlux(String.class); System.out.println("Flux 创建完成");
flux.subscribe(data -> System.out.println("收到: " + data)); System.out.println("订阅完成");
|
每次订阅都是独立的
1 2 3 4 5 6 7 8 9
| Flux<String> flux = webClient.get().uri("/api/random").retrieve().bodyToFlux(String.class);
flux.subscribe(data -> System.out.println("订阅1: " + data));
flux.subscribe(data -> System.out.println("订阅2: " + data));
|
冷流 vs 热流
1 2 3 4 5 6 7 8 9 10 11 12 13
| Flux<Integer> cold = Flux.range(1, 3); cold.subscribe(i -> System.out.println("A: " + i)); cold.subscribe(i -> System.out.println("B: " + i));
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer(); Flux<Integer> hot = sink.asFlux();
hot.subscribe(i -> System.out.println("A: " + i)); sink.tryEmitNext(1); hot.subscribe(i -> System.out.println("B: " + i)); sink.tryEmitNext(2);
|
实战:大模型流式调用完整示例
Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @RestController @RequestMapping("/api/chat") public class ChatController { private final WebClient webClient; @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> chat(@RequestParam String prompt) { return webClient.post() .uri("https://api.openai.com/v1/chat/completions") .header("Authorization", "Bearer " + apiKey) .bodyValue(Map.of( "model", "gpt-4", "messages", List.of(Map.of("role", "user", "content", prompt)), "stream", true )) .retrieve() .bodyToFlux(String.class) .filter(line -> !line.equals("[DONE]")) .map(this::extractContent) .filter(Objects::nonNull) .doOnNext(chunk -> log.debug("发送: {}", chunk)) .doOnComplete(() -> log.info("流式响应完成")) .doOnError(e -> log.error("流式响应错误", e)); } private String extractContent(String line) { try { JsonNode node = objectMapper.readTree(line.substring(6)); return node.at("/choices/0/delta/content").asText(null); } catch (Exception e) { return null; } } }
|
前端接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| const eventSource = new EventSource('/api/chat?prompt=你好');
eventSource.onmessage = (event) => { document.getElementById('response').textContent += event.data; };
eventSource.onerror = () => { eventSource.close(); };
const response = await fetch('/api/chat?prompt=你好'); const reader = response.body.getReader(); const decoder = new TextDecoder();
while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); document.getElementById('response').textContent += chunk; }
|
总结
| 概念 |
说明 |
| Flux 是管道 |
数据流经管道,不存储在管道中 |
| 数据何时确定 |
取决于上游,可能创建时确定,也可能运行时才知道 |
| 数据数量 |
由上游决定,下游不需要提前知道 |
| 流何时结束 |
上游发送 onComplete() 或 onError() |
| 懒加载 |
创建 Flux 不执行,订阅时才执行 |
| Sinks 的作用 |
让你成为上游,主动推送数据 |
核心理解:
- Flux 不是”装了多少数据”,而是”数据从哪来、往哪去”
- 你不需要知道 Flux 里有多少数据,只需要处理流过来的每一个数据
- 这就是响应式编程的核心思想:对数据流做出响应
相关链接
- Sinks 详解
- Project Reactor 实现
- WebFlux - Reactor核心API