Project Reactor 实现 Project Reactor 简介 Project Reactor 是 Spring 官方的 Reactive Streams 实现 ,是 Spring WebFlux 的基础。
核心特点
完全非阻塞 :基于 JDK 8 函数式 API
背压支持 :完整实现 Reactive Streams 规范
丰富的操作符 :提供 100+ 操作符
与 Spring 深度集成 :WebFlux、Spring Data R2DBC 等
Reactor 对 Reactive Streams 的实现 Publisher 的实现 Reactor 提供了两个 Publisher 实现:
1 2 3 4 5 6 7 8 9 public abstract class Mono <T> implements Publisher <T> { }public abstract class Flux <T> implements Publisher <T> { }
为什么分成 Mono 和 Flux? 1 2 3 4 5 6 Mono<User> findById (Long id) ; Flux<User> findAll () ; Publisher<User> findById (Long id) ;
Subscriber 的实现 Reactor 提供了 BaseSubscriber 作为基础实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Flux.range(1 , 10 ) .subscribe(new BaseSubscriber <Integer>() { @Override protected void hookOnSubscribe (Subscription subscription) { request(3 ); } @Override protected void hookOnNext (Integer value) { System.out.println("收到: " + value); request(1 ); } @Override protected void hookOnComplete () { System.out.println("完成" ); } @Override protected void hookOnError (Throwable throwable) { System.err.println("错误: " + throwable); } });
背压策略实现 Reactor 提供了多种背压策略:
1. request 控制(默认) 1 2 3 4 5 6 7 8 9 10 11 12 13 Flux.range(1 , 1000 ) .subscribe(new BaseSubscriber <Integer>() { @Override protected void hookOnSubscribe (Subscription s) { request(10 ); } @Override protected void hookOnNext (Integer value) { request(1 ); } });
2. onBackpressureBuffer - 缓冲 1 2 3 4 5 6 Flux.range(1 , 1000 ) .onBackpressureBuffer(100 ) .subscribe(value -> { Thread.sleep(100 ); System.out.println(value); });
3. onBackpressureDrop - 丢弃 1 2 3 4 5 6 7 Flux.range(1 , 1000 ) .onBackpressureDrop(dropped -> System.out.println("丢弃: " + dropped)) .subscribe(value -> { Thread.sleep(100 ); System.out.println(value); });
4. onBackpressureLatest - 只保留最新 1 2 3 4 5 6 Flux.range(1 , 1000 ) .onBackpressureLatest() .subscribe(value -> { Thread.sleep(100 ); System.out.println(value); });
5. onBackpressureError - 抛异常 1 2 3 4 5 6 Flux.range(1 , 1000 ) .onBackpressureError() .subscribe(value -> { Thread.sleep(100 ); System.out.println(value); });
Subscription 的实现 Reactor 内部使用多种 Subscription 实现:
1 2 3 4 5 6 7 8 9 10 Flux.range(1 , 10 ) .subscribe(new BaseSubscriber <Integer>() { @Override protected void hookOnSubscribe (Subscription subscription) { System.out.println(subscription.getClass()); request(1 ); } });
常见的 Subscription 实现
类名
说明
FluxRange$RangeSubscription
range() 操作的订阅
FluxMap$MapSubscriber
map() 操作的订阅
FluxFilter$FilterSubscriber
filter() 操作的订阅
MonoJust$MonoJustSubscription
Mono.just() 的订阅
Processor 的实现(已废弃) Reactor 3.4 之前提供了 Processor 实现,现在推荐使用 Sinks :
1 2 3 4 5 6 7 8 9 10 11 12 13 Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(); Flux<String> flux = sink.asFlux(); sink.tryEmitNext("Hello" ); sink.tryEmitNext("World" ); sink.tryEmitComplete();
Sinks 的类型 1 2 3 4 5 6 7 8 9 10 11 Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer(); Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer(); Sinks.Many<String> replay = Sinks.many().replay().all(); Sinks.One<String> one = Sinks.one();
操作符链的背压传递 1 2 3 4 5 Flux.range(1 , 1000 ) .map(i -> i * 2 ) .filter(i -> i % 3 == 0 ) .take(10 ) .subscribe(System.out::println);
背压信号从下游向上游传递 :
1 2 3 4 5 6 7 8 9 Subscriber.request (10 ) ↓ take 向上 request (10 ) ↓filter 向上 request (10 ) ↓ map 向上 request (10 ) ↓ range 开始生产数据
调试背压 1 2 3 4 Flux.range(1 , 100 ) .log() .map(i -> i * 2 ) .subscribe(System.out::println);
输出示例:
1 2 3 4 5 6 INFO - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)INFO - | request(unbounded)INFO - | onNext(1)INFO - | onNext(2).. .INFO - | onComplete()
相关链接
上一篇:Reactive Streams 规范
下一篇:源码解析
WebFlux - Reactor核心API