Project Reactor实现

Project Reactor 实现

Project Reactor 简介

Project Reactor 是 Spring 官方的 Reactive Streams 实现,是 Spring WebFlux 的基础。

核心特点

  • 完全非阻塞:基于 JDK 8 函数式 API
  • 背压支持:完整实现 Reactive Streams 规范
  • 丰富的操作符:提供 100+ 操作符
  • 与 Spring 深度集成:WebFlux、Spring Data R2DBC 等

Reactor 对 Reactive Streams 的实现

Publisher 的实现

Reactor 提供了两个 Publisher 实现:

1
2
3
4
5
6
7
8
9
// Mono - 0或1个元素的 Publisher
public abstract class Mono<T> implements Publisher<T> {
// ...
}

// Flux - 0到N个元素的 Publisher
public abstract class Flux<T> implements Publisher<T> {
// ...
}

为什么分成 Mono 和 Flux?

1
2
3
4
5
6
// 语义更清晰
Mono<User> findById(Long id); // 明确:最多返回1个
Flux<User> findAll(); // 明确:可能返回多个

// 对比原始 Publisher
Publisher<User> findById(Long id); // 不清楚:1个还是多个?

Subscriber 的实现

Reactor 提供了 BaseSubscriber 作为基础实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Flux.range(1, 10)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 订阅时请求第一批数据
request(3);
}

@Override
protected void hookOnNext(Integer value) {
System.out.println("收到: " + value);
// 每收到一个,再请求一个(控制背压)
request(1);
}

@Override
protected void hookOnComplete() {
System.out.println("完成");
}

@Override
protected void hookOnError(Throwable throwable) {
System.err.println("错误: " + throwable);
}
});

背压策略实现

Reactor 提供了多种背压策略:

1. request 控制(默认)

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.range(1, 1000)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription s) {
request(10); // 只请求10个
}

@Override
protected void hookOnNext(Integer value) {
// 处理完一个再请求一个
request(1);
}
});

2. onBackpressureBuffer - 缓冲

1
2
3
4
5
6
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 缓冲100个
.subscribe(value -> {
Thread.sleep(100); // 慢消费
System.out.println(value);
});

3. onBackpressureDrop - 丢弃

1
2
3
4
5
6
7
Flux.range(1, 1000)
.onBackpressureDrop(dropped ->
System.out.println("丢弃: " + dropped))
.subscribe(value -> {
Thread.sleep(100);
System.out.println(value);
});

4. onBackpressureLatest - 只保留最新

1
2
3
4
5
6
Flux.range(1, 1000)
.onBackpressureLatest() // 只保留最新的
.subscribe(value -> {
Thread.sleep(100);
System.out.println(value);
});

5. onBackpressureError - 抛异常

1
2
3
4
5
6
Flux.range(1, 1000)
.onBackpressureError() // 背压时抛出异常
.subscribe(value -> {
Thread.sleep(100);
System.out.println(value);
});

Subscription 的实现

Reactor 内部使用多种 Subscription 实现:

1
2
3
4
5
6
7
8
9
10
// 查看实际的 Subscription 类型
Flux.range(1, 10)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println(subscription.getClass());
// 输出: reactor.core.publisher.FluxRange$RangeSubscription
request(1);
}
});

常见的 Subscription 实现

类名 说明
FluxRange$RangeSubscription range() 操作的订阅
FluxMap$MapSubscriber map() 操作的订阅
FluxFilter$FilterSubscriber filter() 操作的订阅
MonoJust$MonoJustSubscription Mono.just() 的订阅

Processor 的实现(已废弃)

Reactor 3.4 之前提供了 Processor 实现,现在推荐使用 Sinks

1
2
3
4
5
6
7
8
9
10
11
12
13
// 旧方式(已废弃)
// DirectProcessor<String> processor = DirectProcessor.create();

// 新方式:使用 Sinks
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

// 作为 Publisher
Flux<String> flux = sink.asFlux();

// 发送数据
sink.tryEmitNext("Hello");
sink.tryEmitNext("World");
sink.tryEmitComplete();

Sinks 的类型

1
2
3
4
5
6
7
8
9
10
11
// 单播:只能有一个订阅者
Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer();

// 多播:多个订阅者共享
Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer();

// 重放:新订阅者可以收到历史数据
Sinks.Many<String> replay = Sinks.many().replay().all();

// 单值
Sinks.One<String> one = Sinks.one();

操作符链的背压传递

1
2
3
4
5
Flux.range(1, 1000)           // 源 Publisher
.map(i -> i * 2) // 中间操作符
.filter(i -> i % 3 == 0) // 中间操作符
.take(10) // 中间操作符
.subscribe(System.out::println); // 最终 Subscriber

背压信号从下游向上游传递

1
2
3
4
5
6
7
8
9
Subscriber.request(10)

take 向上 request(10)

filter 向上 request(10) // 可能请求更多,因为有过滤

map 向上 request(10)

range 开始生产数据

调试背压

1
2
3
4
Flux.range(1, 100)
.log() // 打印所有信号,包括 request
.map(i -> i * 2)
.subscribe(System.out::println);

输出示例:

1
2
3
4
5
6
INFO  - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
INFO - | request(unbounded)
INFO - | onNext(1)
INFO - | onNext(2)
...
INFO - | onComplete()

相关链接

  • 上一篇:Reactive Streams 规范
  • 下一篇:源码解析
  • WebFlux - Reactor核心API

Project Reactor实现
https://zmmmmy.github.io/2026/01/10/Project Reactor实现/
作者
ZhiMy
发布于
2026年1月10日
许可协议