流合并与操作符链
水利系统比喻
在学习 Reactor 时,可以用水利系统来理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ┌──────────────────────────────────────────────────────────────┐ │ Reactor 水利系统 │ ├──────────────────────────────────────────────────────────────┤ │ │ │ 水源类型: │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ just() │ │ interval │ │ HTTP │ │ Sinks │ │ │ │ 矿泉水瓶 │ │ 定时滴水 │ │ 外部水管 │ │ 你的水龙头│ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ └──────┬──────┴──────┬──────┴──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ merge / concat / zip (合并水管) │ │ │ └─────────────────┬───────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ filter / map / take (处理水管) │ │ │ └─────────────────┬───────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────┐ │ │ │ subscribe (水池) │ │ │ └─────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘
|
| 概念 |
比喻 |
说明 |
| Sinks |
水龙头 |
你控制开关,决定什么时候放水 |
| Flux |
水管 |
只负责传输,不管水从哪来、有多少 |
| 操作符 |
处理水管 |
filter=过滤器,map=转换器,take=限流阀 |
| subscribe |
水池 |
接收最终的水 |
一、操作符链(水管接水管)
基本概念
每个操作符都会创建一个新的 Flux,形成链式调用:
1 2 3 4 5
| Flux.range(1, 10) .filter(i -> i > 5) .map(i -> i * 2) .take(3) .subscribe();
|
数据流动过程
1 2 3 4 5 6
| ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ range │───▶│ filter │───▶│ map │───▶│ take │───▶│ 订阅者 │ │1,2...10│ │ >5 │ │ *2 │ │ 3个 │ │ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ 10个 → 5个 → 5个 → 3个 → 收到3个 (6,7,8,9,10) (6,7,8,9,10) (12,14,16,18,20) (12,14,16)
|
常用操作符分类
| 类型 |
操作符 |
作用 |
| 过滤 |
filter, take, skip, distinct |
筛选数据 |
| 转换 |
map, flatMap, concatMap |
转换数据 |
| 组合 |
merge, concat, zip |
合并多个流 |
| 错误处理 |
onErrorReturn, retry |
处理异常 |
| 副作用 |
doOnNext, doOnError, doOnComplete |
执行额外操作 |
二、流合并(多个水龙头合并)
1. merge - 实时混合
特点:谁有数据谁先输出,不保证顺序
1 2 3 4 5
| Flux<String> flux1 = Flux.just("A1", "A2").delayElements(Duration.ofMillis(100)); Flux<String> flux2 = Flux.just("B1", "B2").delayElements(Duration.ofMillis(80));
Flux.merge(flux1, flux2).subscribe(System.out::println);
|
1 2 3
| flux1 ──A1────A2────┐ ├──▶ merge ──▶ B1, A1, B2, A2 flux2 ──B1──B2──────┘
|
适用场景:实时数据混合、不关心顺序
2. concat - 顺序连接
特点:第一个流完成后,才开始第二个流
1 2
| Flux.concat(flux1, flux2).subscribe(System.out::println);
|
1 2 3
| flux1 ──A1──A2──┐ ├──▶ concat ──▶ A1, A2, B1, B2 flux2 ──B1──B2──┘(等A完成再开始)
|
适用场景:需要保证顺序、分页加载
3. zip - 一对一配对
特点:从每个流各取一个元素,组合成一对
1 2 3 4 5 6
| Flux<String> names = Flux.just("张三", "李四", "王五"); Flux<Integer> ages = Flux.just(25, 30, 28);
Flux.zip(names, ages, (name, age) -> name + ": " + age) .subscribe(System.out::println);
|
1 2 3
| names ──张三────李四────王五────┐ ├──▶ zip ──▶ (张三:25), (李四:30), (王五:28) ages ──25──────30──────28─────┘
|
适用场景:数据配对、并行请求结果组合
4. combineLatest - 组合最新值
特点:任一流有新值时,与其他流的最新值组合
1 2 3 4 5 6 7
| Flux<String> flux1 = Flux.interval(Duration.ofMillis(100)).map(i -> "A" + i); Flux<String> flux2 = Flux.interval(Duration.ofMillis(150)).map(i -> "B" + i);
Flux.combineLatest(flux1, flux2, (a, b) -> a + "+" + b) .take(5) .subscribe(System.out::println);
|
适用场景:实时数据组合、表单联动
合并方式对比
| 方式 |
执行顺序 |
输出顺序 |
适用场景 |
merge |
并行 |
谁快谁先 |
实时混合 |
concat |
串行 |
按流顺序 |
顺序处理 |
zip |
并行 |
配对输出 |
数据组合 |
combineLatest |
并行 |
最新组合 |
实时联动 |
三、实战:外部流 + 本地流合并
场景描述
调用外部 API(如大模型),同时本地也能推送消息,两个流合并后返回给前端。
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Component public class ChatService { private final WebClient webClient; private final Sinks.Many<String> notificationSink = Sinks.many().multicast().onBackpressureBuffer();
public Flux<String> getChatStream(String prompt) { Flux<String> llmResponse = webClient.post() .uri("/v1/chat/completions") .bodyValue(buildRequest(prompt)) .retrieve() .bodyToFlux(String.class) .map(this::extractContent) .filter(Objects::nonNull); Flux<String> notifications = notificationSink.asFlux() .map(msg -> "[系统] " + msg); return Flux.merge(llmResponse, notifications); }
public void sendNotification(String message) { notificationSink.tryEmitNext(message); } }
|
效果演示
1 2 3 4 5 6 7 8
| 用户订阅 getChatStream("你好") │ ├── 大模型: "你" ├── 大模型: "好" ├── 系统通知: "[系统] 服务器将在5分钟后维护" ← 随时插入 ├── 大模型: ",我是AI助手" ├── 系统通知: "[系统] 新功能上线" ← 随时插入 └── 大模型: "..."
|
图示
1 2 3 4 5 6 7 8 9 10 11
| ┌─────────────────┐ │ 外部 API │ │ (大模型返回) │──── llmResponse ─────┐ └─────────────────┘ │ ├──▶ Flux.merge() ──▶ 前端 ┌─────────────────┐ │ │ 本地 Sinks │ │ │ (系统通知) │──── notifications ───┘ └─────────────────┘ ↑ sendNotification("消息")
|
四、生产环境注意事项
1. 背压处理
问题:上游产生数据太快,下游处理不过来
1 2 3 4 5 6 7 8 9 10 11 12
| Flux.interval(Duration.ofMillis(1)) .subscribe(data -> { Thread.sleep(100); });
Flux.interval(Duration.ofMillis(1)) .onBackpressureDrop(dropped -> log.warn("丢弃: {}", dropped)) .subscribe(data -> { Thread.sleep(100); });
|
背压策略选择:
| 策略 |
说明 |
适用场景 |
onBackpressureBuffer() |
缓冲 |
数据不能丢,内存够用 |
onBackpressureBuffer(100) |
有限缓冲 |
控制内存使用 |
onBackpressureDrop() |
丢弃 |
实时数据,可以丢 |
onBackpressureLatest() |
只保留最新 |
状态更新 |
onBackpressureError() |
抛异常 |
不允许背压 |
2. 错误处理
问题:一个流出错会导致整个合并流终止
1 2 3 4 5 6 7 8 9 10 11
| Flux.merge(llmResponse, notifications);
Flux<String> safeLlmResponse = llmResponse .onErrorResume(e -> { log.error("大模型调用失败", e); return Flux.just("[错误] 大模型暂时不可用"); });
Flux.merge(safeLlmResponse, notifications);
|
错误处理策略:
1 2 3 4 5 6 7 8 9 10 11 12
| .onErrorReturn("默认值")
.onErrorResume(e -> backupFlux)
.retry(3) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorContinue((e, data) -> log.error("处理失败: {}", data, e))
|
3. 超时处理
1 2 3 4 5 6 7 8 9 10
| Flux<String> response = webClient.get().retrieve().bodyToFlux(String.class);
Flux<String> response = webClient.get() .retrieve() .bodyToFlux(String.class) .timeout(Duration.ofSeconds(30)) .onErrorResume(TimeoutException.class, e -> Flux.just("[超时] 请求超时,请重试"));
|
4. 资源清理
1 2 3 4 5 6 7 8 9
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
@PreDestroy public void cleanup() { sink.tryEmitComplete(); }
|
5. 线程安全
1 2 3 4 5 6
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<String> unsafeSink = Sinks.unsafe().many().multicast().onBackpressureBuffer();
|
6. 避免阻塞
1 2 3 4 5 6 7 8 9 10 11
| flux.map(data -> { Thread.sleep(1000); return process(data); });
flux.flatMap(data -> Mono.fromCallable(() -> blockingProcess(data)) .subscribeOn(Schedulers.boundedElastic()) );
|
五、最佳实践
1. 操作符链设计
1 2 3 4 5 6 7 8 9 10 11 12 13
| return webClient.get() .uri("/api/users") .retrieve() .bodyToFlux(User.class) .filter(user -> user.isActive()) .map(user -> user.getName()) .distinct() .take(10) .timeout(Duration.ofSeconds(5)) .onErrorResume(e -> Flux.empty()) .doOnNext(name -> log.debug("处理: {}", name)) .subscribeOn(Schedulers.boundedElastic());
|
2. Sinks 使用规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class EventBus { private final Sinks.Many<Event> eventSink = Sinks.many().multicast().onBackpressureBuffer(); public void publish(Event event) { Sinks.EmitResult result = eventSink.tryEmitNext(event); if (result.isFailure()) { log.warn("事件发送失败: {}, 原因: {}", event, result); } } public Flux<Event> subscribe() { return eventSink.asFlux(); } }
|
3. 合并流的错误隔离
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public Flux<Message> getMergedStream() { Flux<Message> stream1 = getStream1() .onErrorResume(e -> { log.error("stream1 错误", e); return Flux.empty(); }); Flux<Message> stream2 = getStream2() .onErrorResume(e -> { log.error("stream2 错误", e); return Flux.empty(); }); return Flux.merge(stream1, stream2); }
|
4. 日志与监控
1 2 3 4 5 6 7 8 9 10
| flux .doOnSubscribe(s -> log.info("开始订阅")) .doOnNext(data -> log.debug("处理数据: {}", data)) .doOnError(e -> log.error("发生错误", e)) .doOnComplete(() -> log.info("处理完成")) .doOnCancel(() -> log.warn("订阅被取消")) .doFinally(signal -> { metrics.recordStreamEnd(signal); });
|
5. 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test void testMergedStream() { Flux<String> merged = Flux.merge( Flux.just("A"), Flux.just("B") ); StepVerifier.create(merged) .expectNextCount(2) .verifyComplete(); }
@Test void testWithVirtualTime() { StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofSeconds(1)).take(3) ) .thenAwait(Duration.ofSeconds(3)) .expectNext(0L, 1L, 2L) .verifyComplete(); }
|
六、常见问题
Q1: merge 和 concat 怎么选?
- merge:需要实时混合,不关心顺序
- concat:需要保证顺序,一个完成再处理下一个
Q2: 为什么我的 Sinks 发送的数据没人收到?
检查:
- 是否有订阅者?(multicast 模式下,没订阅者数据会丢失)
- 是否在订阅之前就发送了?(热流特性)
- 考虑使用
replay() 模式
Q3: 流合并后,一个出错整个都停了?
每个流单独添加错误处理:
1 2 3 4
| Flux.merge( stream1.onErrorResume(e -> Flux.empty()), stream2.onErrorResume(e -> Flux.empty()) );
|
Q4: 内存一直涨?
检查:
- 是否有无限流没有
take() 限制?
- 是否背压处理不当?添加
onBackpressureXxx()
- 是否有流没有正确完成?检查
tryEmitComplete()
相关链接
- Flux 是管道不是容器
- Sinks 详解
- Reactor API 参考
- WebFlux - Reactor核心API