Reactor 核心 API
什么是 Project Reactor
Project Reactor 是 Spring WebFlux 默认使用的响应式库,它实现了 Reactive Streams 规范,提供了两个核心类型:Mono 和 Flux。
Mono - 单值容器
Mono<T> 表示0个或1个元素的异步序列。
创建 Mono
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.empty();
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));
Mono<String> mono4 = Mono.fromSupplier(() -> "Lazy Value");
Mono<String> mono5 = Mono.fromCallable(() -> blockingCall());
Mono<String> mono6 = Mono.fromFuture(completableFuture);
|
Mono 常用操作
1 2 3 4 5 6 7 8 9
| Mono.just("Hello") .map(s -> s + " World") .flatMap(s -> anotherMono(s)) .filter(s -> s.length() > 5) .defaultIfEmpty("Default") .switchIfEmpty(anotherMono) .doOnNext(s -> log.info(s)) .doOnError(e -> log.error(e)) .subscribe();
|
Flux - 多值容器
Flux<T> 表示0到N个元素的异步序列。
创建 Flux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Flux<Integer> flux4 = Flux.range(1, 10);
Flux<String> flux5 = Flux.empty();
Flux<Long> flux6 = Flux.interval(Duration.ofSeconds(1));
Flux<String> flux7 = Flux.fromStream(Stream.of("A", "B", "C"));
|
Flux 常用操作
1 2 3 4 5 6 7 8 9
| Flux.range(1, 10) .map(i -> i * 2) .filter(i -> i > 5) .take(5) .skip(2) .distinct() .sort() .collectList() .subscribe();
|
核心操作符分类
转换操作符
| 操作符 |
说明 |
示例 |
map |
同步转换 |
flux.map(i -> i * 2) |
flatMap |
异步转换(不保序) |
flux.flatMap(i -> getMono(i)) |
flatMapSequential |
异步转换(保序) |
flux.flatMapSequential(...) |
concatMap |
顺序异步转换 |
flux.concatMap(...) |
switchMap |
切换到新流 |
flux.switchMap(...) |
过滤操作符
1 2 3 4 5 6 7 8 9 10 11 12
| Flux.range(1, 100) .filter(i -> i % 2 == 0) .take(10) .takeLast(5) .takeWhile(i -> i < 50) .skip(5) .skipWhile(i -> i < 10) .distinct() .distinctUntilChanged() .elementAt(5) .single() .subscribe();
|
组合操作符
1 2 3 4 5 6 7 8 9 10 11
| Flux.merge(flux1, flux2, flux3);
Flux.concat(flux1, flux2, flux3);
Flux.zip(flux1, flux2, (a, b) -> a + b);
Flux.combineLatest(flux1, flux2, (a, b) -> a + b);
|
错误处理操作符
1 2 3 4 5 6 7 8 9 10 11
| Flux.just(1, 2, 3) .map(i -> { if (i == 2) throw new RuntimeException("Error"); return i; }) .onErrorReturn(-1) .onErrorResume(e -> Flux.just(0)) .onErrorMap(e -> new CustomException()) .retry(3) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .subscribe();
|
调度器(Scheduler)
调度器用于控制操作在哪个线程上执行。
常用调度器
| 调度器 |
说明 |
适用场景 |
Schedulers.immediate() |
当前线程 |
默认 |
Schedulers.single() |
单一线程 |
顺序执行 |
Schedulers.parallel() |
并行线程池 |
CPU 密集型 |
Schedulers.boundedElastic() |
弹性线程池 |
I/O 密集型 |
切换线程
1 2 3 4 5
| Flux.range(1, 10) .publishOn(Schedulers.parallel()) .map(i -> i * 2) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
|
subscribeOn:影响订阅和数据源的执行线程
publishOn:影响后续操作符的执行线程
冷流与热流
冷流(Cold)
1 2 3 4
| Flux<Integer> cold = Flux.range(1, 5); cold.subscribe(i -> System.out.println("Subscriber 1: " + i)); cold.subscribe(i -> System.out.println("Subscriber 2: " + i));
|
热流(Hot)
- 所有订阅者共享同一个数据流
- 订阅后只能收到之后发出的数据
1 2 3 4 5 6 7 8 9 10 11
| Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer(); Flux<Integer> hot = sink.asFlux();
hot.subscribe(i -> System.out.println("Subscriber 1: " + i)); sink.tryEmitNext(1); sink.tryEmitNext(2);
hot.subscribe(i -> System.out.println("Subscriber 2: " + i)); sink.tryEmitNext(3);
|
相关链接
- 上一篇:响应式编程基础
- 下一篇:WebFlux核心组件