Sinks详解

Sinks 详解

什么是 Sinks

Sinks 是 Reactor 提供的”手动发送数据”的工具,让你可以在代码中主动 push 数据到响应式流中。

核心问题:Mono/Flux 的局限

1
2
3
4
// Mono/Flux 通常是"被动"的 - 数据在创建时就确定了
Mono.just("hello"); // 数据已确定
Flux.fromIterable(list); // 数据已确定
Flux.interval(Duration.ofSeconds(1)); // 数据按规则生成

但有些场景需要”主动”发送数据

  • 接收外部事件(WebSocket、消息队列)
  • 用户交互触发
  • 异步回调结果
  • 多个地方产生数据,汇聚到一个流

Sinks 的作用

1
2
3
4
5
6
7
8
9
// Sinks 让你可以"主动"发送数据
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 在任何地方、任何时候发送数据
sink.tryEmitNext("消息1");
sink.tryEmitNext("消息2");

// 订阅者可以收到这些数据
sink.asFlux().subscribe(System.out::println);

Sinks 与 Mono/Flux 的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────────────────────────┐
│ 数据生产方式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Mono/Flux (被动) Sinks (主动) │
│ ───────────────── ───────────── │
│ 数据在创建时确定 数据可以随时发送 │
just(), fromIterable() tryEmitNext() │
interval(), range() tryEmitComplete() │
│ │
│ ↓ 转换 ↓ │
│ │
│ sink.asFlux() → Flux<T> │
│ sink.asMono() → Mono<T> │
│ │
│ 最终都是 Flux/Mono,订阅者无感知差异 │
└─────────────────────────────────────────────────────────────┘

本质:Sinks 是 Flux/Mono 的数据源,它生产数据,Flux/Mono 传递数据。


Sinks 的三种类型

1. Sinks.One - 单值(对应 Mono)

只能发送 0 或 1 个值,然后完成。

1
2
3
4
5
6
7
8
Sinks.One<String> sink = Sinks.one();

// 发送一个值
sink.tryEmitValue("结果");

// 转为 Mono
Mono<String> mono = sink.asMono();
mono.subscribe(System.out::println); // 输出: 结果

使用场景:异步操作的单个结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 示例:封装异步回调
public Mono<User> fetchUserAsync(Long id) {
Sinks.One<User> sink = Sinks.one();

// 假设这是一个回调式的 API
asyncClient.getUser(id, new Callback<User>() {
@Override
public void onSuccess(User user) {
sink.tryEmitValue(user); // 成功时发送值
}

@Override
public void onError(Exception e) {
sink.tryEmitError(e); // 失败时发送错误
}
});

return sink.asMono();
}

2. Sinks.Empty - 无值(对应 Mono

只发送完成或错误信号,不发送值。

1
2
3
4
5
6
7
8
9
10
11
12
Sinks.Empty<Void> sink = Sinks.empty();

// 发送完成信号
sink.tryEmitEmpty();

// 转为 Mono<Void>
Mono<Void> mono = sink.asMono();
mono.subscribe(
v -> {},
e -> {},
() -> System.out.println("完成了") // 输出: 完成了
);

使用场景:只关心操作是否完成,不关心返回值

1
2
3
4
5
6
7
8
// 示例:等待初始化完成
public Mono<Void> waitForInit() {
Sinks.Empty<Void> sink = Sinks.empty();

initService.onReady(() -> sink.tryEmitEmpty());

return sink.asMono();
}

3. Sinks.Many - 多值(对应 Flux)

可以发送 0 到 N 个值,这是最常用的类型。

1
2
3
4
5
6
7
8
9
10
11
12
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 发送多个值
sink.tryEmitNext("消息1");
sink.tryEmitNext("消息2");
sink.tryEmitNext("消息3");
sink.tryEmitComplete();

// 转为 Flux
Flux<String> flux = sink.asFlux();
flux.subscribe(System.out::println);
// 输出: 消息1, 消息2, 消息3

Sinks.Many 的三种模式

1. Unicast(单播)- 只能有一个订阅者

1
2
3
4
5
6
7
8
9
10
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

Flux<String> flux = sink.asFlux();

// 第一个订阅者
flux.subscribe(v -> System.out.println("订阅者1: " + v));

// 第二个订阅者 会报错!
flux.subscribe(v -> System.out.println("订阅者2: " + v));
// 抛出 IllegalStateException

特点:

  • 只允许一个订阅者
  • 内存效率高(不需要维护多个订阅者状态)
  • 适合点对点通信

2. Multicast(多播)- 多个订阅者共享

1
2
3
4
5
6
7
8
9
10
11
12
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

Flux<String> flux = sink.asFlux();

// 多个订阅者都可以收到数据
flux.subscribe(v -> System.out.println("订阅者1: " + v));
flux.subscribe(v -> System.out.println("订阅者2: " + v));

sink.tryEmitNext("Hello");
// 输出:
// 订阅者1: Hello
// 订阅者2: Hello

特点:

  • 允许多个订阅者
  • 热流:新订阅者只能收到订阅后的数据
  • 适合广播场景
1
2
3
4
5
6
7
8
9
10
// 热流特性演示
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

sink.tryEmitNext("消息1"); // 此时没有订阅者,消息丢失

Flux<String> flux = sink.asFlux();
flux.subscribe(v -> System.out.println("收到: " + v));

sink.tryEmitNext("消息2"); // 订阅者收到
// 输出: 收到: 消息2

3. Replay(重放)- 新订阅者可收到历史数据

1
2
3
4
5
6
7
8
9
10
11
// 重放所有历史数据
Sinks.Many<String> sink = Sinks.many().replay().all();

sink.tryEmitNext("消息1");
sink.tryEmitNext("消息2");

// 新订阅者可以收到之前的数据!
sink.asFlux().subscribe(v -> System.out.println("收到: " + v));
// 输出:
// 收到: 消息1
// 收到: 消息2

Replay 的变体:

1
2
3
4
5
6
7
8
9
10
11
// 重放所有
Sinks.many().replay().all();

// 重放最近 N 条
Sinks.many().replay().limit(10);

// 只重放最新一条
Sinks.many().replay().latest();

// 重放指定时间内的数据
Sinks.many().replay().limit(Duration.ofMinutes(5));

三种模式对比

模式 订阅者数量 历史数据 适用场景
Unicast 1个 点对点、队列消费
Multicast 多个 无(热流) 事件广播、实时推送
Replay 多个 有(可配置) 状态共享、缓存

发送数据的方法

tryEmitXxx vs emitXxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 方式1: tryEmitXxx - 返回结果,不抛异常
Sinks.EmitResult result = sink.tryEmitNext("data");
if (result.isFailure()) {
// 处理失败
System.out.println("发送失败: " + result);
}

// 方式2: emitXxx - 可自定义失败处理
sink.emitNext("data", (signalType, emitResult) -> {
// 返回 true 表示重试,false 表示放弃
return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
});

EmitResult 的类型

结果 说明
OK 发送成功
FAIL_TERMINATED Sink 已终止
FAIL_OVERFLOW 背压溢出
FAIL_CANCELLED 已取消
FAIL_NON_SERIALIZED 并发冲突(多线程同时发送)
FAIL_ZERO_SUBSCRIBER 没有订阅者(unicast)

常用的 EmitFailureHandler

1
2
3
4
5
// 快速失败 - 失败就放弃
Sinks.EmitFailureHandler.FAIL_FAST

// 忙等待重试 - 适合并发场景
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100))

实战示例

示例1:WebSocket 消息推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class WebSocketHandler {

// 用于向所有连接的客户端广播消息
private final Sinks.Many<String> messageSink =
Sinks.many().multicast().onBackpressureBuffer();

// 获取消息流,供 WebSocket 订阅
public Flux<String> getMessageStream() {
return messageSink.asFlux();
}

// 发送消息给所有客户端
public void broadcast(String message) {
messageSink.tryEmitNext(message);
}
}

// WebSocket 配置
@Bean
public HandlerMapping webSocketMapping(WebSocketHandler handler) {
return new SimpleUrlHandlerMapping(Map.of(
"/ws", session -> session.send(
handler.getMessageStream()
.map(session::textMessage)
)
), -1);
}

示例2:事件总线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class EventBus {

private final Sinks.Many<Event> eventSink =
Sinks.many().multicast().onBackpressureBuffer();

// 发布事件
public void publish(Event event) {
eventSink.tryEmitNext(event);
}

// 订阅特定类型的事件
public <T extends Event> Flux<T> subscribe(Class<T> eventType) {
return eventSink.asFlux()
.filter(eventType::isInstance)
.cast(eventType);
}
}

// 使用
eventBus.subscribe(UserCreatedEvent.class)
.subscribe(event -> System.out.println("用户创建: " + event.getUserId()));

eventBus.publish(new UserCreatedEvent(123L));

示例3:异步任务结果收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Flux<Result> executeTasksAsync(List<Task> tasks) {
Sinks.Many<Result> sink = Sinks.many().unicast().onBackpressureBuffer();

AtomicInteger remaining = new AtomicInteger(tasks.size());

for (Task task : tasks) {
CompletableFuture.runAsync(() -> {
Result result = task.execute();
sink.tryEmitNext(result);

if (remaining.decrementAndGet() == 0) {
sink.tryEmitComplete();
}
});
}

return sink.asFlux();
}

示例4:缓存最新状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class StatusCache {

// 使用 replay().latest() 缓存最新状态
private final Sinks.Many<SystemStatus> statusSink =
Sinks.many().replay().latest();

// 更新状态
public void updateStatus(SystemStatus status) {
statusSink.tryEmitNext(status);
}

// 获取当前状态(新订阅者立即收到最新状态)
public Mono<SystemStatus> getCurrentStatus() {
return statusSink.asFlux().next();
}

// 监听状态变化
public Flux<SystemStatus> watchStatus() {
return statusSink.asFlux();
}
}

Sinks vs 旧版 Processor

Reactor 3.4 之前使用 Processor,现在已废弃:

旧版 Processor 新版 Sinks
DirectProcessor Sinks.many().multicast().directBestEffort()
UnicastProcessor Sinks.many().unicast()
ReplayProcessor Sinks.many().replay()
EmitterProcessor Sinks.many().multicast()

为什么废弃 Processor?

  • Processor 同时是 Publisher 和 Subscriber,职责不清
  • 线程安全问题难以处理
  • Sinks 提供更清晰的 API 和更好的线程安全保证

线程安全

Sinks 默认是线程安全的,但有性能开销。

1
2
3
4
5
// 默认:线程安全
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 如果确定单线程使用,可以用 unsafe 版本提升性能
Sinks.Many<String> unsafeSink = Sinks.unsafe().many().multicast().onBackpressureBuffer();

注意unsafe 版本在多线程下会出问题,只在确定单线程时使用。


总结

概念 说明
Sinks 是什么 手动发送数据到响应式流的工具
与 Mono/Flux 关系 Sinks 是数据源,通过 asFlux()/asMono() 转换
Sinks.One 发送 0-1 个值,对应 Mono
Sinks.Empty 只发送完成/错误信号,对应 Mono
Sinks.Many 发送 0-N 个值,对应 Flux
Unicast 单订阅者
Multicast 多订阅者,热流
Replay 多订阅者,可重放历史

相关链接

  • Reactor API 参考
  • Project Reactor 实现
  • WebFlux 核心组件

Sinks详解
https://zmmmmy.github.io/2026/01/10/Sinks详解/
作者
ZhiMy
发布于
2026年1月10日
许可协议