源码解析

源码解析

Mono.just() 源码分析

从最简单的 Mono.just() 开始:

1
2
Mono.just("Hello")
.subscribe(System.out::println);

1. 创建 Mono

1
2
3
4
5
6
7
8
9
10
11
12
13
// Mono.java
public static <T> Mono<T> just(T data) {
return onAssembly(new MonoJust<>(data));
}

// MonoJust.java - 持有数据的 Mono
final class MonoJust<T> extends Mono<T> {
final T value;

MonoJust(T value) {
this.value = Objects.requireNonNull(value);
}
}

2. 订阅时发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
// Mono.java
public final void subscribe(Consumer<? super T> consumer) {
// 创建 LambdaMonoSubscriber 包装 consumer
subscribe(new LambdaMonoSubscriber<>(consumer, ...));
}

// MonoJust.java
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
// 直接调用 onSubscribe 和 onNext
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}

3. 数据传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Operators.java - ScalarSubscription
static final class ScalarSubscription<T> implements Subscription {
final CoreSubscriber<? super T> actual;
final T value;
volatile int once; // 确保只发送一次

@Override
public void request(long n) {
if (n > 0 && ONCE.compareAndSet(this, 0, 1)) {
actual.onNext(value); // 发送数据
actual.onComplete(); // 完成
}
}
}

流程总结

1
2
3
4
5
6
7
8
9
10
11
12
13
Mono.just("Hello")

new MonoJust("Hello")

subscribe(consumer)

onSubscribe(ScalarSubscription)

request(Long.MAX_VALUE) // LambdaSubscriber 默认请求无限

onNext("Hello")

onComplete()

Flux.map() 源码分析

1
2
3
Flux.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println);

1. 创建 FluxMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Flux.java
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
return onAssembly(new FluxMap<>(this, mapper));
}

// FluxMap.java
final class FluxMap<T, R> extends FluxOperator<T, R> {
final Function<? super T, ? extends R> mapper;

FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = mapper;
}
}

2. 订阅时的包装

1
2
3
4
5
6
// FluxMap.java
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
// 用 MapSubscriber 包装下游订阅者
source.subscribe(new MapSubscriber<>(actual, mapper));
}

3. 数据转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// FluxMap.java - MapSubscriber
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
final CoreSubscriber<? super R> actual; // 下游
final Function<? super T, ? extends R> mapper;
Subscription s; // 上游的 Subscription

@Override
public void onNext(T t) {
R v = mapper.apply(t); // 转换数据
actual.onNext(v); // 传给下游
}

@Override
public void request(long n) {
s.request(n); // 背压信号向上传递
}
}

订阅链结构

1
2
3
4
5
FluxArray [1,2,3]
subscribe
MapSubscriber (mapper: i -> i*2)
subscribe
LambdaSubscriber (consumer: println)

数据流向

1
2
3
4
5
FluxArray.onNext(1)

MapSubscriber.onNext(1) → mapper.apply(1) = 2

LambdaSubscriber.onNext(2) → println(2)

背压传递源码

1
2
3
4
Flux.range(1, 100)
.filter(i -> i % 2 == 0)
.take(5)
.subscribe(System.out::println);

request 的传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// FluxTake.java - TakeSubscriber
@Override
public void request(long n) {
if (n > 0) {
// 向上游请求,但不超过剩余需要的数量
long toRequest = Math.min(n, remaining);
s.request(toRequest);
}
}

// FluxFilter.java - FilterSubscriber
@Override
public void request(long n) {
// filter 可能需要请求更多,因为有些会被过滤掉
s.request(n);
}

完整的背压流程

1
2
3
4
5
6
7
8
9
10
LambdaSubscriber.request(Long.MAX_VALUE)

TakeSubscriber.request(Long.MAX_VALUE)
→ 实际请求 min(MAX_VALUE, 5) = 5

FilterSubscriber.request(5)
→ 直接传递(filter 不知道会过滤多少)

RangeSubscription.request(5)
→ 开始生产 1,2,3,4,5

异步边界与调度器

1
2
3
4
Flux.range(1, 10)
.publishOn(Schedulers.parallel())
.map(i -> i * 2)
.subscribe(System.out::println);

publishOn 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// FluxPublishOn.java
final class FluxPublishOn<T> extends FluxOperator<T, T> {
final Scheduler scheduler;
final int prefetch; // 预取数量

// PublishOnSubscriber 内部有队列
static final class PublishOnSubscriber<T> implements Subscriber<T>, Runnable {
final Queue<T> queue; // 缓冲队列
final Worker worker; // 调度器工作线程

@Override
public void onNext(T t) {
queue.offer(t); // 放入队列
trySchedule(); // 调度执行
}

void trySchedule() {
worker.schedule(this); // 在另一个线程执行 run()
}

@Override
public void run() {
// 在调度器线程中执行
T v = queue.poll();
actual.onNext(v);
}
}
}

线程切换点

1
2
3
main 线程: range → publishOn 的 queue
↓ (线程切换)
parallel 线程: queue → map → subscribe

关键设计模式

1. 装饰器模式

每个操作符都是对上游的装饰:

1
2
3
Flux<Integer> source = Flux.range(1, 10);
Flux<Integer> mapped = new FluxMap<>(source, i -> i * 2);
Flux<Integer> filtered = new FluxFilter<>(mapped, i -> i > 5);

2. 责任链模式

订阅时形成 Subscriber 链:

1
2
3
4
5
6
7
source.subscribe(
new MapSubscriber(
new FilterSubscriber(
new LambdaSubscriber(consumer)
)
)
);

3. 观察者模式

Publisher-Subscriber 本身就是观察者模式的变体。

相关链接

  • 上一篇:Project Reactor 实现
  • Reactive Streams 规范
  • WebFlux - 线程模型与Netty

源码解析
https://zmmmmy.github.io/2026/01/10/源码解析/
作者
ZhiMy
发布于
2026年1月10日
许可协议