Reactor核心API

Reactor 核心 API

什么是 Project Reactor

Project Reactor 是 Spring WebFlux 默认使用的响应式库,它实现了 Reactive Streams 规范,提供了两个核心类型:MonoFlux

Mono - 单值容器

Mono<T> 表示0个或1个元素的异步序列。

创建 Mono

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建包含值的 Mono
Mono<String> mono1 = Mono.just("Hello");

// 创建空的 Mono
Mono<String> mono2 = Mono.empty();

// 创建错误的 Mono
Mono<String> mono3 = Mono.error(new RuntimeException("Error"));

// 延迟创建
Mono<String> mono4 = Mono.fromSupplier(() -> "Lazy Value");

// 从 Callable 创建
Mono<String> mono5 = Mono.fromCallable(() -> blockingCall());

// 从 Future 创建
Mono<String> mono6 = Mono.fromFuture(completableFuture);

Mono 常用操作

1
2
3
4
5
6
7
8
9
Mono.just("Hello")
.map(s -> s + " World") // 转换
.flatMap(s -> anotherMono(s)) // 异步转换
.filter(s -> s.length() > 5) // 过滤
.defaultIfEmpty("Default") // 默认值
.switchIfEmpty(anotherMono) // 空时切换
.doOnNext(s -> log.info(s)) // 副作用
.doOnError(e -> log.error(e)) // 错误处理
.subscribe(); // 订阅触发执行

Flux - 多值容器

Flux<T> 表示0到N个元素的异步序列。

创建 Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 从多个值创建
Flux<String> flux1 = Flux.just("A", "B", "C");

// 从数组创建
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});

// 从集合创建
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));

// 创建范围
Flux<Integer> flux4 = Flux.range(1, 10); // 1到10

// 创建空 Flux
Flux<String> flux5 = Flux.empty();

// 定时发射
Flux<Long> flux6 = Flux.interval(Duration.ofSeconds(1));

// 从 Stream 创建
Flux<String> flux7 = Flux.fromStream(Stream.of("A", "B", "C"));

Flux 常用操作

1
2
3
4
5
6
7
8
9
Flux.range(1, 10)
.map(i -> i * 2) // 转换每个元素
.filter(i -> i > 5) // 过滤
.take(5) // 只取前5个
.skip(2) // 跳过前2个
.distinct() // 去重
.sort() // 排序
.collectList() // 收集为 Mono<List>
.subscribe();

核心操作符分类

转换操作符

操作符 说明 示例
map 同步转换 flux.map(i -> i * 2)
flatMap 异步转换(不保序) flux.flatMap(i -> getMono(i))
flatMapSequential 异步转换(保序) flux.flatMapSequential(...)
concatMap 顺序异步转换 flux.concatMap(...)
switchMap 切换到新流 flux.switchMap(...)

过滤操作符

1
2
3
4
5
6
7
8
9
10
11
12
Flux.range(1, 100)
.filter(i -> i % 2 == 0) // 过滤偶数
.take(10) // 取前10个
.takeLast(5) // 取后5个
.takeWhile(i -> i < 50) // 满足条件时取
.skip(5) // 跳过前5个
.skipWhile(i -> i < 10) // 满足条件时跳过
.distinct() // 去重
.distinctUntilChanged() // 相邻去重
.elementAt(5) // 取第5个元素
.single() // 确保只有一个元素
.subscribe();

组合操作符

1
2
3
4
5
6
7
8
9
10
11
// 合并多个 Flux(交错)
Flux.merge(flux1, flux2, flux3);

// 连接多个 Flux(顺序)
Flux.concat(flux1, flux2, flux3);

// 配对组合
Flux.zip(flux1, flux2, (a, b) -> a + b);

// 合并最新值
Flux.combineLatest(flux1, flux2, (a, b) -> a + b);

错误处理操作符

1
2
3
4
5
6
7
8
9
10
11
Flux.just(1, 2, 3)
.map(i -> {
if (i == 2) throw new RuntimeException("Error");
return i;
})
.onErrorReturn(-1) // 错误时返回默认值
.onErrorResume(e -> Flux.just(0)) // 错误时切换到备用流
.onErrorMap(e -> new CustomException()) // 转换异常
.retry(3) // 重试3次
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 退避重试
.subscribe();

调度器(Scheduler)

调度器用于控制操作在哪个线程上执行。

常用调度器

调度器 说明 适用场景
Schedulers.immediate() 当前线程 默认
Schedulers.single() 单一线程 顺序执行
Schedulers.parallel() 并行线程池 CPU 密集型
Schedulers.boundedElastic() 弹性线程池 I/O 密集型

切换线程

1
2
3
4
5
Flux.range(1, 10)
.publishOn(Schedulers.parallel()) // 后续操作在 parallel 线程
.map(i -> i * 2)
.subscribeOn(Schedulers.boundedElastic()) // 订阅在 elastic 线程
.subscribe();
  • subscribeOn:影响订阅和数据源的执行线程
  • publishOn:影响后续操作符的执行线程

冷流与热流

冷流(Cold)

  • 每个订阅者都会从头开始接收数据
  • 数据是惰性生成的
1
2
3
4
Flux<Integer> cold = Flux.range(1, 5);
cold.subscribe(i -> System.out.println("Subscriber 1: " + i));
cold.subscribe(i -> System.out.println("Subscriber 2: " + i));
// 两个订阅者都会收到 1, 2, 3, 4, 5

热流(Hot)

  • 所有订阅者共享同一个数据流
  • 订阅后只能收到之后发出的数据
1
2
3
4
5
6
7
8
9
10
11
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> hot = sink.asFlux();

hot.subscribe(i -> System.out.println("Subscriber 1: " + i));
sink.tryEmitNext(1);
sink.tryEmitNext(2);

hot.subscribe(i -> System.out.println("Subscriber 2: " + i));
sink.tryEmitNext(3);
// Subscriber 1 收到: 1, 2, 3
// Subscriber 2 收到: 3

相关链接

  • 上一篇:响应式编程基础
  • 下一篇:WebFlux核心组件

Reactor核心API
https://zmmmmy.github.io/2026/01/10/Reactor核心API/
作者
ZhiMy
发布于
2026年1月10日
许可协议