流合并与操作符链

流合并与操作符链

水利系统比喻

在学习 Reactor 时,可以用水利系统来理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌──────────────────────────────────────────────────────────────┐
│ Reactor 水利系统 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 水源类型: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ just() │ │ interval │ │ HTTP │ │ Sinks │ │
│ │ 矿泉水瓶 │ │ 定时滴水 │ │ 外部水管 │ │ 你的水龙头│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └──────┬──────┴──────┬──────┴──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ merge / concat / zip (合并水管) │ │
│ └─────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ filter / map / take (处理水管) │ │
│ └─────────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ subscribe (水池) │ │
│ └─────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
概念 比喻 说明
Sinks 水龙头 你控制开关,决定什么时候放水
Flux 水管 只负责传输,不管水从哪来、有多少
操作符 处理水管 filter=过滤器,map=转换器,take=限流阀
subscribe 水池 接收最终的水

一、操作符链(水管接水管)

基本概念

每个操作符都会创建一个新的 Flux,形成链式调用:

1
2
3
4
5
Flux.range(1, 10)       // 水源:1到10
.filter(i -> i > 5) // 过滤:只留大于5的
.map(i -> i * 2) // 转换:每个乘2
.take(3) // 限流:只取3个
.subscribe(); // 水池:接收

数据流动过程

1
2
3
4
5
6
┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
│ range │───▶│ filter │───▶│ map │───▶│ take │───▶│ 订阅者 │
1,2...10│ │ >5 │ │ *2 │ │ 3个 │ │ │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘
10个 → 5个 → 5个 → 3个 → 收到3
(6,7,8,9,10) (6,7,8,9,10) (12,14,16,18,20) (12,14,16)

常用操作符分类

类型 操作符 作用
过滤 filter, take, skip, distinct 筛选数据
转换 map, flatMap, concatMap 转换数据
组合 merge, concat, zip 合并多个流
错误处理 onErrorReturn, retry 处理异常
副作用 doOnNext, doOnError, doOnComplete 执行额外操作

二、流合并(多个水龙头合并)

1. merge - 实时混合

特点:谁有数据谁先输出,不保证顺序

1
2
3
4
5
Flux<String> flux1 = Flux.just("A1", "A2").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("B1", "B2").delayElements(Duration.ofMillis(80));

Flux.merge(flux1, flux2).subscribe(System.out::println);
// 可能输出: B1, A1, B2, A2(谁快谁先)
1
2
3
flux1 ──A1────A2────┐
├──▶ merge ──▶ B1, A1, B2, A2
flux2 ──B1──B2──────┘

适用场景:实时数据混合、不关心顺序

2. concat - 顺序连接

特点:第一个流完成后,才开始第二个流

1
2
Flux.concat(flux1, flux2).subscribe(System.out::println);
// 输出: A1, A2, B1, B2(先A全部,再B全部)
1
2
3
flux1 ──A1──A2──┐
├──▶ concat ──▶ A1, A2, B1, B2
flux2 ──B1──B2──┘(等A完成再开始)

适用场景:需要保证顺序、分页加载

3. zip - 一对一配对

特点:从每个流各取一个元素,组合成一对

1
2
3
4
5
6
Flux<String> names = Flux.just("张三", "李四", "王五");
Flux<Integer> ages = Flux.just(25, 30, 28);

Flux.zip(names, ages, (name, age) -> name + ": " + age)
.subscribe(System.out::println);
// 输出: 张三: 25, 李四: 30, 王五: 28
1
2
3
names ──张三────李四────王五────┐
├──▶ zip ──▶ (张三:25), (李四:30), (王五:28)
ages ──25──────30──────28─────┘

适用场景:数据配对、并行请求结果组合

4. combineLatest - 组合最新值

特点:任一流有新值时,与其他流的最新值组合

1
2
3
4
5
6
7
Flux<String> flux1 = Flux.interval(Duration.ofMillis(100)).map(i -> "A" + i);
Flux<String> flux2 = Flux.interval(Duration.ofMillis(150)).map(i -> "B" + i);

Flux.combineLatest(flux1, flux2, (a, b) -> a + "+" + b)
.take(5)
.subscribe(System.out::println);
// 输出: A0+B0, A1+B0, A1+B1, A2+B1, A3+B1...

适用场景:实时数据组合、表单联动

合并方式对比

方式 执行顺序 输出顺序 适用场景
merge 并行 谁快谁先 实时混合
concat 串行 按流顺序 顺序处理
zip 并行 配对输出 数据组合
combineLatest 并行 最新组合 实时联动

三、实战:外部流 + 本地流合并

场景描述

调用外部 API(如大模型),同时本地也能推送消息,两个流合并后返回给前端。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class ChatService {

private final WebClient webClient;

// 系统通知的水龙头
private final Sinks.Many<String> notificationSink =
Sinks.many().multicast().onBackpressureBuffer();

/**
* 获取合并后的消息流
* 用户既能收到大模型回复,也能收到系统通知
*/
public Flux<String> getChatStream(String prompt) {
// 1. 外部流:大模型回复
Flux<String> llmResponse = webClient.post()
.uri("/v1/chat/completions")
.bodyValue(buildRequest(prompt))
.retrieve()
.bodyToFlux(String.class)
.map(this::extractContent)
.filter(Objects::nonNull);

// 2. 本地流:系统通知
Flux<String> notifications = notificationSink.asFlux()
.map(msg -> "[系统] " + msg);

// 3. 合并两个流
return Flux.merge(llmResponse, notifications);
}

/**
* 发送系统通知(可以在任何地方调用)
*/
public void sendNotification(String message) {
notificationSink.tryEmitNext(message);
}
}

效果演示

1
2
3
4
5
6
7
8
用户订阅 getChatStream("你好")

├── 大模型: "你"
├── 大模型: "好"
├── 系统通知: "[系统] 服务器将在5分钟后维护" ← 随时插入
├── 大模型: ",我是AI助手"
├── 系统通知: "[系统] 新功能上线" ← 随时插入
└── 大模型: "..."

图示

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────┐
│ 外部 API │
│ (大模型返回) │──── llmResponse ─────┐
└─────────────────┘ │
├──▶ Flux.merge() ──▶ 前端
┌─────────────────┐ │
│ 本地 Sinks │ │
│ (系统通知) │──── notifications ───┘
└─────────────────┘

sendNotification("消息")

四、生产环境注意事项

1. 背压处理

问题:上游产生数据太快,下游处理不过来

1
2
3
4
5
6
7
8
9
10
11
12
// [错误] 可能导致内存溢出
Flux.interval(Duration.ofMillis(1)) // 每毫秒一个
.subscribe(data -> {
Thread.sleep(100); // 处理很慢
});

// [正确] 添加背压策略
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop(dropped -> log.warn("丢弃: {}", dropped))
.subscribe(data -> {
Thread.sleep(100);
});

背压策略选择

策略 说明 适用场景
onBackpressureBuffer() 缓冲 数据不能丢,内存够用
onBackpressureBuffer(100) 有限缓冲 控制内存使用
onBackpressureDrop() 丢弃 实时数据,可以丢
onBackpressureLatest() 只保留最新 状态更新
onBackpressureError() 抛异常 不允许背压

2. 错误处理

问题:一个流出错会导致整个合并流终止

1
2
3
4
5
6
7
8
9
10
11
// [错误] llmResponse 出错会导致整个流终止
Flux.merge(llmResponse, notifications);

// [正确] 单独处理每个流的错误
Flux<String> safeLlmResponse = llmResponse
.onErrorResume(e -> {
log.error("大模型调用失败", e);
return Flux.just("[错误] 大模型暂时不可用");
});

Flux.merge(safeLlmResponse, notifications);

错误处理策略

1
2
3
4
5
6
7
8
9
10
11
12
// 返回默认值
.onErrorReturn("默认值")

// 切换到备用流
.onErrorResume(e -> backupFlux)

// 重试
.retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))

// 记录错误但继续
.onErrorContinue((e, data) -> log.error("处理失败: {}", data, e))

3. 超时处理

1
2
3
4
5
6
7
8
9
10
// [错误] 外部 API 可能永远不返回
Flux<String> response = webClient.get().retrieve().bodyToFlux(String.class);

// [正确] 设置超时
Flux<String> response = webClient.get()
.retrieve()
.bodyToFlux(String.class)
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, e ->
Flux.just("[超时] 请求超时,请重试"));

4. 资源清理

1
2
3
4
5
6
7
8
9
// [错误] Sinks 没有正确关闭
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
// 应用关闭时,sink 还在运行

// [正确] 在适当时机关闭
@PreDestroy
public void cleanup() {
sink.tryEmitComplete();
}

5. 线程安全

1
2
3
4
5
6
// Sinks 默认是线程安全的
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 如果确定单线程,可以用 unsafe 版本提升性能
Sinks.Many<String> unsafeSink = Sinks.unsafe().many().multicast().onBackpressureBuffer();
// [注意] 多线程下使用 unsafe 会出问题!

6. 避免阻塞

1
2
3
4
5
6
7
8
9
10
11
// [错误] 在响应式流中阻塞
flux.map(data -> {
Thread.sleep(1000); // 阻塞!
return process(data);
});

// [正确] 使用 boundedElastic 调度器处理阻塞操作
flux.flatMap(data ->
Mono.fromCallable(() -> blockingProcess(data))
.subscribeOn(Schedulers.boundedElastic())
);

五、最佳实践

1. 操作符链设计

1
2
3
4
5
6
7
8
9
10
11
12
13
// [推荐] 清晰的操作符链
return webClient.get()
.uri("/api/users")
.retrieve()
.bodyToFlux(User.class)
.filter(user -> user.isActive()) // 1. 先过滤
.map(user -> user.getName()) // 2. 再转换
.distinct() // 3. 去重
.take(10) // 4. 限制数量
.timeout(Duration.ofSeconds(5)) // 5. 超时控制
.onErrorResume(e -> Flux.empty()) // 6. 错误处理
.doOnNext(name -> log.debug("处理: {}", name)) // 7. 日志
.subscribeOn(Schedulers.boundedElastic()); // 8. 调度器

2. Sinks 使用规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class EventBus {

// [推荐] 使用 multicast + onBackpressureBuffer
private final Sinks.Many<Event> eventSink =
Sinks.many().multicast().onBackpressureBuffer();

public void publish(Event event) {
// [推荐] 检查发送结果
Sinks.EmitResult result = eventSink.tryEmitNext(event);
if (result.isFailure()) {
log.warn("事件发送失败: {}, 原因: {}", event, result);
}
}

public Flux<Event> subscribe() {
return eventSink.asFlux();
}
}

3. 合并流的错误隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// [推荐] 每个流独立处理错误
public Flux<Message> getMergedStream() {
Flux<Message> stream1 = getStream1()
.onErrorResume(e -> {
log.error("stream1 错误", e);
return Flux.empty();
});

Flux<Message> stream2 = getStream2()
.onErrorResume(e -> {
log.error("stream2 错误", e);
return Flux.empty();
});

return Flux.merge(stream1, stream2);
}

4. 日志与监控

1
2
3
4
5
6
7
8
9
10
flux
.doOnSubscribe(s -> log.info("开始订阅"))
.doOnNext(data -> log.debug("处理数据: {}", data))
.doOnError(e -> log.error("发生错误", e))
.doOnComplete(() -> log.info("处理完成"))
.doOnCancel(() -> log.warn("订阅被取消"))
.doFinally(signal -> {
// 记录指标
metrics.recordStreamEnd(signal);
});

5. 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
void testMergedStream() {
// 使用 StepVerifier 测试
Flux<String> merged = Flux.merge(
Flux.just("A"),
Flux.just("B")
);

StepVerifier.create(merged)
.expectNextCount(2)
.verifyComplete();
}

@Test
void testWithVirtualTime() {
// 测试时间相关的流
StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofSeconds(1)).take(3)
)
.thenAwait(Duration.ofSeconds(3))
.expectNext(0L, 1L, 2L)
.verifyComplete();
}

六、常见问题

Q1: merge 和 concat 怎么选?

  • merge:需要实时混合,不关心顺序
  • concat:需要保证顺序,一个完成再处理下一个

Q2: 为什么我的 Sinks 发送的数据没人收到?

检查:

  1. 是否有订阅者?(multicast 模式下,没订阅者数据会丢失)
  2. 是否在订阅之前就发送了?(热流特性)
  3. 考虑使用 replay() 模式

Q3: 流合并后,一个出错整个都停了?

每个流单独添加错误处理:

1
2
3
4
Flux.merge(
stream1.onErrorResume(e -> Flux.empty()),
stream2.onErrorResume(e -> Flux.empty())
);

Q4: 内存一直涨?

检查:

  1. 是否有无限流没有 take() 限制?
  2. 是否背压处理不当?添加 onBackpressureXxx()
  3. 是否有流没有正确完成?检查 tryEmitComplete()

相关链接

  • Flux 是管道不是容器
  • Sinks 详解
  • Reactor API 参考
  • WebFlux - Reactor核心API

流合并与操作符链
https://zmmmmy.github.io/2026/01/10/流合并与操作符链/
作者
ZhiMy
发布于
2026年1月10日
许可协议