Sinks 详解
什么是 Sinks
Sinks 是 Reactor 提供的”手动发送数据”的工具,让你可以在代码中主动 push 数据到响应式流中。
核心问题:Mono/Flux 的局限
1 2 3 4
| Mono.just("hello"); Flux.fromIterable(list); Flux.interval(Duration.ofSeconds(1));
|
但有些场景需要”主动”发送数据:
- 接收外部事件(WebSocket、消息队列)
- 用户交互触发
- 异步回调结果
- 多个地方产生数据,汇聚到一个流
Sinks 的作用
1 2 3 4 5 6 7 8 9
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
sink.tryEmitNext("消息1"); sink.tryEmitNext("消息2");
sink.asFlux().subscribe(System.out::println);
|
Sinks 与 Mono/Flux 的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ┌─────────────────────────────────────────────────────────────┐ │ 数据生产方式 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Mono/Flux (被动) Sinks (主动) │ │ ───────────────── ───────────── │ │ 数据在创建时确定 数据可以随时发送 │ │ just(), fromIterable() tryEmitNext() │ │ interval(), range() tryEmitComplete() │ │ │ │ ↓ 转换 ↓ │ │ │ │ sink.asFlux() → Flux<T> │ │ sink.asMono() → Mono<T> │ │ │ │ 最终都是 Flux/Mono,订阅者无感知差异 │ └─────────────────────────────────────────────────────────────┘
|
本质:Sinks 是 Flux/Mono 的数据源,它生产数据,Flux/Mono 传递数据。
Sinks 的三种类型
1. Sinks.One - 单值(对应 Mono)
只能发送 0 或 1 个值,然后完成。
1 2 3 4 5 6 7 8
| Sinks.One<String> sink = Sinks.one();
sink.tryEmitValue("结果");
Mono<String> mono = sink.asMono(); mono.subscribe(System.out::println);
|
使用场景:异步操作的单个结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public Mono<User> fetchUserAsync(Long id) { Sinks.One<User> sink = Sinks.one(); asyncClient.getUser(id, new Callback<User>() { @Override public void onSuccess(User user) { sink.tryEmitValue(user); } @Override public void onError(Exception e) { sink.tryEmitError(e); } }); return sink.asMono(); }
|
2. Sinks.Empty - 无值(对应 Mono)
只发送完成或错误信号,不发送值。
1 2 3 4 5 6 7 8 9 10 11 12
| Sinks.Empty<Void> sink = Sinks.empty();
sink.tryEmitEmpty();
Mono<Void> mono = sink.asMono(); mono.subscribe( v -> {}, e -> {}, () -> System.out.println("完成了") );
|
使用场景:只关心操作是否完成,不关心返回值
1 2 3 4 5 6 7 8
| public Mono<Void> waitForInit() { Sinks.Empty<Void> sink = Sinks.empty(); initService.onReady(() -> sink.tryEmitEmpty()); return sink.asMono(); }
|
3. Sinks.Many - 多值(对应 Flux)
可以发送 0 到 N 个值,这是最常用的类型。
1 2 3 4 5 6 7 8 9 10 11 12
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
sink.tryEmitNext("消息1"); sink.tryEmitNext("消息2"); sink.tryEmitNext("消息3"); sink.tryEmitComplete();
Flux<String> flux = sink.asFlux(); flux.subscribe(System.out::println);
|
Sinks.Many 的三种模式
1. Unicast(单播)- 只能有一个订阅者
1 2 3 4 5 6 7 8 9 10
| Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> flux = sink.asFlux();
flux.subscribe(v -> System.out.println("订阅者1: " + v));
flux.subscribe(v -> System.out.println("订阅者2: " + v));
|
特点:
- 只允许一个订阅者
- 内存效率高(不需要维护多个订阅者状态)
- 适合点对点通信
2. Multicast(多播)- 多个订阅者共享
1 2 3 4 5 6 7 8 9 10 11 12
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = sink.asFlux();
flux.subscribe(v -> System.out.println("订阅者1: " + v)); flux.subscribe(v -> System.out.println("订阅者2: " + v));
sink.tryEmitNext("Hello");
|
特点:
- 允许多个订阅者
- 热流:新订阅者只能收到订阅后的数据
- 适合广播场景
1 2 3 4 5 6 7 8 9 10
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
sink.tryEmitNext("消息1");
Flux<String> flux = sink.asFlux(); flux.subscribe(v -> System.out.println("收到: " + v));
sink.tryEmitNext("消息2");
|
3. Replay(重放)- 新订阅者可收到历史数据
1 2 3 4 5 6 7 8 9 10 11
| Sinks.Many<String> sink = Sinks.many().replay().all();
sink.tryEmitNext("消息1"); sink.tryEmitNext("消息2");
sink.asFlux().subscribe(v -> System.out.println("收到: " + v));
|
Replay 的变体:
1 2 3 4 5 6 7 8 9 10 11
| Sinks.many().replay().all();
Sinks.many().replay().limit(10);
Sinks.many().replay().latest();
Sinks.many().replay().limit(Duration.ofMinutes(5));
|
三种模式对比
| 模式 |
订阅者数量 |
历史数据 |
适用场景 |
| Unicast |
1个 |
无 |
点对点、队列消费 |
| Multicast |
多个 |
无(热流) |
事件广播、实时推送 |
| Replay |
多个 |
有(可配置) |
状态共享、缓存 |
发送数据的方法
tryEmitXxx vs emitXxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("data"); if (result.isFailure()) { System.out.println("发送失败: " + result); }
sink.emitNext("data", (signalType, emitResult) -> { return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED; });
|
EmitResult 的类型
| 结果 |
说明 |
OK |
发送成功 |
FAIL_TERMINATED |
Sink 已终止 |
FAIL_OVERFLOW |
背压溢出 |
FAIL_CANCELLED |
已取消 |
FAIL_NON_SERIALIZED |
并发冲突(多线程同时发送) |
FAIL_ZERO_SUBSCRIBER |
没有订阅者(unicast) |
常用的 EmitFailureHandler
1 2 3 4 5
| Sinks.EmitFailureHandler.FAIL_FAST
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100))
|
实战示例
示例1:WebSocket 消息推送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Component public class WebSocketHandler { private final Sinks.Many<String> messageSink = Sinks.many().multicast().onBackpressureBuffer(); public Flux<String> getMessageStream() { return messageSink.asFlux(); } public void broadcast(String message) { messageSink.tryEmitNext(message); } }
@Bean public HandlerMapping webSocketMapping(WebSocketHandler handler) { return new SimpleUrlHandlerMapping(Map.of( "/ws", session -> session.send( handler.getMessageStream() .map(session::textMessage) ) ), -1); }
|
示例2:事件总线
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component public class EventBus { private final Sinks.Many<Event> eventSink = Sinks.many().multicast().onBackpressureBuffer(); public void publish(Event event) { eventSink.tryEmitNext(event); } public <T extends Event> Flux<T> subscribe(Class<T> eventType) { return eventSink.asFlux() .filter(eventType::isInstance) .cast(eventType); } }
eventBus.subscribe(UserCreatedEvent.class) .subscribe(event -> System.out.println("用户创建: " + event.getUserId()));
eventBus.publish(new UserCreatedEvent(123L));
|
示例3:异步任务结果收集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public Flux<Result> executeTasksAsync(List<Task> tasks) { Sinks.Many<Result> sink = Sinks.many().unicast().onBackpressureBuffer(); AtomicInteger remaining = new AtomicInteger(tasks.size()); for (Task task : tasks) { CompletableFuture.runAsync(() -> { Result result = task.execute(); sink.tryEmitNext(result); if (remaining.decrementAndGet() == 0) { sink.tryEmitComplete(); } }); } return sink.asFlux(); }
|
示例4:缓存最新状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class StatusCache { private final Sinks.Many<SystemStatus> statusSink = Sinks.many().replay().latest(); public void updateStatus(SystemStatus status) { statusSink.tryEmitNext(status); } public Mono<SystemStatus> getCurrentStatus() { return statusSink.asFlux().next(); } public Flux<SystemStatus> watchStatus() { return statusSink.asFlux(); } }
|
Sinks vs 旧版 Processor
Reactor 3.4 之前使用 Processor,现在已废弃:
| 旧版 Processor |
新版 Sinks |
DirectProcessor |
Sinks.many().multicast().directBestEffort() |
UnicastProcessor |
Sinks.many().unicast() |
ReplayProcessor |
Sinks.many().replay() |
EmitterProcessor |
Sinks.many().multicast() |
为什么废弃 Processor?
- Processor 同时是 Publisher 和 Subscriber,职责不清
- 线程安全问题难以处理
- Sinks 提供更清晰的 API 和更好的线程安全保证
线程安全
Sinks 默认是线程安全的,但有性能开销。
1 2 3 4 5
| Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<String> unsafeSink = Sinks.unsafe().many().multicast().onBackpressureBuffer();
|
注意:unsafe 版本在多线程下会出问题,只在确定单线程时使用。
总结
| 概念 |
说明 |
| Sinks 是什么 |
手动发送数据到响应式流的工具 |
| 与 Mono/Flux 关系 |
Sinks 是数据源,通过 asFlux()/asMono() 转换 |
| Sinks.One |
发送 0-1 个值,对应 Mono |
| Sinks.Empty |
只发送完成/错误信号,对应 Mono |
| Sinks.Many |
发送 0-N 个值,对应 Flux |
| Unicast |
单订阅者 |
| Multicast |
多订阅者,热流 |
| Replay |
多订阅者,可重放历史 |
相关链接
- Reactor API 参考
- Project Reactor 实现
- WebFlux 核心组件