源码解析 Mono.just() 源码分析 从最简单的 Mono.just() 开始:
1 2 Mono.just("Hello" ) .subscribe(System.out::println);
1. 创建 Mono 1 2 3 4 5 6 7 8 9 10 11 12 13 public static <T> Mono<T> just (T data) { return onAssembly(new MonoJust <>(data)); }final class MonoJust <T> extends Mono <T> { final T value; MonoJust(T value) { this .value = Objects.requireNonNull(value); } }
2. 订阅时发生了什么 1 2 3 4 5 6 7 8 9 10 11 12 public final void subscribe (Consumer<? super T> consumer) { subscribe(new LambdaMonoSubscriber <>(consumer, ...)); }@Override public void subscribe (CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, value)); }
3. 数据传递 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static final class ScalarSubscription <T> implements Subscription { final CoreSubscriber<? super T> actual; final T value; volatile int once; @Override public void request (long n) { if (n > 0 && ONCE.compareAndSet(this , 0 , 1 )) { actual.onNext(value); actual.onComplete(); } } }
流程总结 :
1 2 3 4 5 6 7 8 9 10 11 12 13 Mono.just ("Hello" ) ↓ new MonoJust ("Hello" ) ↓subscribe (consumer) ↓onSubscribe (ScalarSubscription) ↓request (Long.MAX_VALUE) ↓onNext ("Hello" ) ↓onComplete ()
Flux.map() 源码分析 1 2 3 Flux.just(1 , 2 , 3 ) .map(i -> i * 2 ) .subscribe(System.out::println);
1. 创建 FluxMap 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public final <V> Flux<V> map (Function<? super T, ? extends V> mapper) { return onAssembly(new FluxMap <>(this , mapper)); }final class FluxMap <T, R> extends FluxOperator <T, R> { final Function<? super T, ? extends R > mapper; FluxMap(Flux<? extends T > source, Function<? super T, ? extends R > mapper) { super (source); this .mapper = mapper; } }
2. 订阅时的包装 1 2 3 4 5 6 @Override public void subscribe (CoreSubscriber<? super R> actual) { source.subscribe(new MapSubscriber <>(actual, mapper)); }
3. 数据转换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static final class MapSubscriber <T, R> implements Subscriber <T>, Subscription { final CoreSubscriber<? super R> actual; final Function<? super T, ? extends R > mapper; Subscription s; @Override public void onNext (T t) { R v = mapper.apply(t); actual.onNext(v); } @Override public void request (long n) { s.request(n); } }
订阅链结构 :
1 2 3 4 5 FluxArray [1,2,3] ↓ subscribe MapSubscriber (mapper : i -> i*2 ) ↓ subscribe LambdaSubscriber (consumer : println)
数据流向 :
1 2 3 4 5 FluxArray.onNext (1 ) ↓ MapSubscriber.onNext (1 ) → mapper.apply (1 ) = 2 ↓ LambdaSubscriber.onNext (2 ) → println (2 )
背压传递源码 1 2 3 4 Flux.range(1 , 100 ) .filter(i -> i % 2 == 0 ) .take(5 ) .subscribe(System.out::println);
request 的传递 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void request (long n) { if (n > 0 ) { long toRequest = Math.min(n, remaining); s.request(toRequest); } }@Override public void request (long n) { s.request(n); }
完整的背压流程 1 2 3 4 5 6 7 8 9 10 LambdaSubscriber.request (Long.MAX_VALUE) ↓ TakeSubscriber.request (Long.MAX_VALUE) → 实际请求 min (MAX_VALUE, 5 ) = 5 ↓ FilterSubscriber.request (5 ) → 直接传递(filter 不知道会过滤多少) ↓ RangeSubscription.request (5 ) → 开始生产 1 ,2 ,3 ,4 ,5
异步边界与调度器 1 2 3 4 Flux.range(1 , 10 ) .publishOn(Schedulers.parallel()) .map(i -> i * 2 ) .subscribe(System.out::println);
publishOn 的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 final class FluxPublishOn <T> extends FluxOperator <T, T> { final Scheduler scheduler; final int prefetch; static final class PublishOnSubscriber <T> implements Subscriber <T>, Runnable { final Queue<T> queue; final Worker worker; @Override public void onNext (T t) { queue.offer(t); trySchedule(); } void trySchedule () { worker.schedule(this ); } @Override public void run () { T v = queue.poll(); actual.onNext(v); } } }
线程切换点 :
1 2 3 main 线程: range → publishOn 的 queue ↓ (线程切换) parallel 线程: queue → map → subscribe
关键设计模式 1. 装饰器模式 每个操作符都是对上游的装饰:
1 2 3 Flux<Integer> source = Flux.range(1 , 10 ); Flux<Integer> mapped = new FluxMap <>(source, i -> i * 2 ); Flux<Integer> filtered = new FluxFilter <>(mapped, i -> i > 5 );
2. 责任链模式 订阅时形成 Subscriber 链:
1 2 3 4 5 6 7 source.subscribe( new MapSubscriber ( new FilterSubscriber ( new LambdaSubscriber (consumer) ) ) );
3. 观察者模式 Publisher-Subscriber 本身就是观察者模式的变体。
相关链接
上一篇:Project Reactor 实现
Reactive Streams 规范
WebFlux - 线程模型与Netty