Reactor API参考

Reactor API 参考

Project Reactor 提供了丰富的操作符,本文按功能分类整理。


一、创建操作符

Mono 创建

API 说明 示例
Mono.just(T) 创建包含单个值的 Mono Mono.just("hello")
Mono.empty() 创建空的 Mono Mono.empty()
Mono.error(Throwable) 创建错误 Mono Mono.error(new RuntimeException())
Mono.never() 永不发出信号的 Mono Mono.never()
Mono.defer(Supplier) 延迟创建,每次订阅都执行 Mono.defer(() -> Mono.just(getValue()))
Mono.fromCallable(Callable) 从 Callable 创建 Mono.fromCallable(() -> compute())
Mono.fromSupplier(Supplier) 从 Supplier 创建 Mono.fromSupplier(() -> getValue())
Mono.fromFuture(CompletableFuture) 从 Future 创建 Mono.fromFuture(future)
Mono.fromRunnable(Runnable) 从 Runnable 创建(无返回值) Mono.fromRunnable(() -> doSomething())
Mono.delay(Duration) 延迟后发出 0L Mono.delay(Duration.ofSeconds(1))
Mono.create(Consumer<MonoSink>) 编程式创建 见下方示例
1
2
3
4
5
6
7
8
9
// Mono.create 示例
Mono.create(sink -> {
String result = doAsyncWork();
if (result != null) {
sink.success(result);
} else {
sink.error(new Exception("失败"));
}
});

Flux 创建

API 说明 示例
Flux.just(T...) 从多个值创建 Flux.just(1, 2, 3)
Flux.fromIterable(Iterable) 从集合创建 Flux.fromIterable(list)
Flux.fromArray(T[]) 从数组创建 Flux.fromArray(array)
Flux.fromStream(Stream) 从 Stream 创建 Flux.fromStream(stream)
Flux.range(int, int) 创建整数序列 Flux.range(1, 10)
Flux.interval(Duration) 定时发出递增数字 Flux.interval(Duration.ofSeconds(1))
Flux.empty() 创建空 Flux Flux.empty()
Flux.error(Throwable) 创建错误 Flux Flux.error(new Exception())
Flux.never() 永不发出信号 Flux.never()
Flux.defer(Supplier) 延迟创建 Flux.defer(() -> Flux.just(getData()))
Flux.generate(Consumer) 同步生成(一次一个) 见下方示例
Flux.create(Consumer) 异步生成(可多个) 见下方示例
Flux.push(Consumer) 单线程异步生成 类似 create
Flux.using(Callable, Function, Consumer) 资源管理 见下方示例
Flux.concat(Publisher...) 顺序连接多个流 Flux.concat(flux1, flux2)
Flux.merge(Publisher...) 并行合并多个流 Flux.merge(flux1, flux2)
Flux.zip(Publisher...) 组合多个流 Flux.zip(flux1, flux2)
Flux.combineLatest(Publisher...) 组合最新值 见下方示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Flux.generate 示例 - 同步生成
Flux.generate(
() -> 0, // 初始状态
(state, sink) -> {
sink.next("值: " + state);
if (state == 10) sink.complete();
return state + 1;
}
);

// Flux.create 示例 - 异步生成
Flux.create(sink -> {
eventSource.onData(data -> sink.next(data));
eventSource.onError(e -> sink.error(e));
eventSource.onComplete(() -> sink.complete());
});

// Flux.using 示例 - 资源管理
Flux.using(
() -> new FileInputStream("file.txt"), // 创建资源
inputStream -> Flux.just(readData(inputStream)), // 使用资源
inputStream -> inputStream.close() // 关闭资源
);

二、转换操作符

基本转换

API 说明 示例
map(Function) 同步转换每个元素 flux.map(i -> i * 2)
flatMap(Function) 异步转换,结果合并(无序) flux.flatMap(id -> fetchUser(id))
flatMapSequential(Function) 异步转换,保持顺序 flux.flatMapSequential(id -> fetchUser(id))
concatMap(Function) 异步转换,顺序执行 flux.concatMap(id -> fetchUser(id))
switchMap(Function) 切换到新流,取消旧流 flux.switchMap(id -> fetchUser(id))
cast(Class) 类型转换 flux.cast(String.class)
ofType(Class) 过滤并转换类型 flux.ofType(String.class)
index() 添加索引 flux.index()Tuple2<Long, T>
timestamp() 添加时间戳 flux.timestamp()Tuple2<Long, T>
elapsed() 添加时间间隔 flux.elapsed()

flatMap 变体对比

1
2
3
4
flatMap:           并行执行,结果无序
flatMapSequential: 并行执行,结果有序
concatMap: 顺序执行,结果有序
switchMap: 切换新流,取消旧流
1
2
3
4
5
6
7
8
// flatMap vs concatMap
Flux.range(1, 3)
.flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100 - i * 30)))
.subscribe(System.out::println); // 可能输出: 3, 2, 1

Flux.range(1, 3)
.concatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100)))
.subscribe(System.out::println); // 输出: 1, 2, 3

三、过滤操作符

API 说明 示例
filter(Predicate) 过滤元素 flux.filter(i -> i > 5)
filterWhen(Function) 异步过滤 flux.filterWhen(i -> checkAsync(i))
distinct() 去重 flux.distinct()
distinctUntilChanged() 连续去重 flux.distinctUntilChanged()
take(long) 取前 n 个 flux.take(5)
takeLast(int) 取最后 n 个 flux.takeLast(3)
takeUntil(Predicate) 取直到条件满足 flux.takeUntil(i -> i > 10)
takeWhile(Predicate) 取当条件满足时 flux.takeWhile(i -> i < 10)
skip(long) 跳过前 n 个 flux.skip(5)
skipLast(int) 跳过最后 n 个 flux.skipLast(3)
skipUntil(Predicate) 跳过直到条件满足 flux.skipUntil(i -> i > 5)
skipWhile(Predicate) 跳过当条件满足时 flux.skipWhile(i -> i < 5)
first() 取第一个(已废弃) 使用 next()
next() 取第一个,返回 Mono flux.next()
last() 取最后一个 flux.last()
last(T) 取最后一个,有默认值 flux.last(defaultValue)
single() 期望只有一个元素 flux.single()
singleOrEmpty() 期望 0 或 1 个元素 flux.singleOrEmpty()
elementAt(int) 取指定位置元素 flux.elementAt(5)
ignoreElements() 忽略所有元素 flux.ignoreElements()

四、组合操作符

API 说明 示例
concat(Publisher...) 顺序连接 Flux.concat(flux1, flux2)
concatWith(Publisher) 与另一个流连接 flux1.concatWith(flux2)
merge(Publisher...) 并行合并 Flux.merge(flux1, flux2)
mergeWith(Publisher) 与另一个流合并 flux1.mergeWith(flux2)
mergeSequential(Publisher...) 并行执行,顺序输出 Flux.mergeSequential(flux1, flux2)
zip(Publisher...) 一对一组合 Flux.zip(flux1, flux2)
zipWith(Publisher) 与另一个流组合 flux1.zipWith(flux2)
zipWith(Publisher, BiFunction) 组合并转换 flux1.zipWith(flux2, (a, b) -> a + b)
combineLatest(Publisher...) 组合最新值 Flux.combineLatest(flux1, flux2, combiner)
startWith(T...) 在开头添加元素 flux.startWith(0)
and(Publisher) 等待两个都完成 mono1.and(mono2)
when(Publisher...) 等待所有完成 Mono.when(mono1, mono2)
then() 忽略元素,只关心完成 flux.then()
thenMany(Publisher) 完成后切换到另一个流 mono.thenMany(flux)
thenReturn(T) 完成后返回固定值 mono.thenReturn("done")

组合操作符对比

1
2
3
concat:  A完成 → B开始,顺序执行
merge: A和B同时执行,谁快谁先输出
zip: A和B一对一配对,(a1,b1), (a2,b2)...
1
2
3
4
5
6
7
// zip 示例
Flux<String> names = Flux.just("张三", "李四");
Flux<Integer> ages = Flux.just(25, 30);

Flux.zip(names, ages, (name, age) -> name + ": " + age)
.subscribe(System.out::println);
// 输出: 张三: 25, 李四: 30

五、错误处理操作符

API 说明 示例
onErrorReturn(T) 错误时返回默认值 flux.onErrorReturn(defaultValue)
onErrorReturn(Predicate, T) 特定错误返回默认值 flux.onErrorReturn(e -> e instanceof IOException, default)
onErrorResume(Function) 错误时切换到备用流 flux.onErrorResume(e -> backupFlux)
onErrorMap(Function) 转换错误类型 flux.onErrorMap(e -> new CustomException(e))
onErrorContinue(BiConsumer) 错误时跳过继续 flux.onErrorContinue((e, v) -> log.error(e))
onErrorStop() 错误时停止 flux.onErrorStop()
doOnError(Consumer) 错误时执行副作用 flux.doOnError(e -> log.error(e))
retry() 无限重试 flux.retry()
retry(long) 重试 n 次 flux.retry(3)
retryWhen(Retry) 自定义重试策略 见下方示例
1
2
3
4
5
6
7
8
9
10
11
12
// retryWhen 示例
flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(e -> e instanceof IOException)
.onRetryExhaustedThrow((spec, signal) -> signal.failure())
);

// 指数退避重试
flux.retryWhen(Retry.backoff(5, Duration.ofMillis(100)));

// 固定间隔重试
flux.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)));

六、副作用操作符(doOn 系列)

API 说明 触发时机
doOnNext(Consumer) 每个元素 onNext 信号
doOnError(Consumer) 发生错误 onError 信号
doOnComplete(Runnable) 完成时 onComplete 信号
doOnSubscribe(Consumer) 订阅时 onSubscribe 信号
doOnRequest(LongConsumer) 请求数据时 request 信号
doOnCancel(Runnable) 取消时 cancel 信号
doOnTerminate(Runnable) 终止时(完成或错误) onComplete 或 onError
doAfterTerminate(Runnable) 终止后 终止信号发送后
doOnEach(Consumer<Signal>) 每个信号 所有信号
doFirst(Runnable) 订阅前 订阅链建立前
doFinally(Consumer<SignalType>) 最终执行 任何终止情况
1
2
3
4
5
6
7
8
9
10
11
12
13
// 完整的生命周期监控
flux
.doFirst(() -> log.info("开始"))
.doOnSubscribe(s -> log.info("订阅"))
.doOnRequest(n -> log.info("请求: " + n))
.doOnNext(v -> log.info("收到: " + v))
.doOnError(e -> log.error("错误", e))
.doOnComplete(() -> log.info("完成"))
.doOnCancel(() -> log.info("取消"))
.doOnTerminate(() -> log.info("终止"))
.doAfterTerminate(() -> log.info("终止后"))
.doFinally(type -> log.info("最终: " + type))
.subscribe();

七、调度器操作符

API 说明 示例
subscribeOn(Scheduler) 指定订阅线程 flux.subscribeOn(Schedulers.boundedElastic())
publishOn(Scheduler) 指定发布线程 flux.publishOn(Schedulers.parallel())

调度器类型

Scheduler 说明 适用场景
Schedulers.immediate() 当前线程 测试、简单场景
Schedulers.single() 单线程 顺序执行
Schedulers.parallel() 并行线程池(CPU核心数) CPU 密集型
Schedulers.boundedElastic() 弹性线程池(有上限) I/O 密集型、阻塞操作
Schedulers.fromExecutor(Executor) 自定义线程池 特殊需求
1
2
3
4
5
6
7
// subscribeOn vs publishOn
Flux.range(1, 3)
.doOnNext(i -> log.info("生成: " + i)) // boundedElastic 线程
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.doOnNext(i -> log.info("处理: " + i)) // parallel 线程
.subscribe();

八、缓冲与窗口操作符

缓冲(收集为 List)

API 说明 示例
buffer() 收集所有元素 flux.buffer()Flux<List<T>>
buffer(int) 按数量缓冲 flux.buffer(10)
buffer(Duration) 按时间缓冲 flux.buffer(Duration.ofSeconds(1))
buffer(int, int) 滑动窗口缓冲 flux.buffer(3, 1)
bufferUntil(Predicate) 缓冲直到条件满足 flux.bufferUntil(i -> i % 10 == 0)
bufferWhile(Predicate) 缓冲当条件满足时 flux.bufferWhile(i -> i < 10)
bufferTimeout(int, Duration) 数量或时间触发 flux.bufferTimeout(100, Duration.ofSeconds(1))

窗口(收集为 Flux)

API 说明 示例
window(int) 按数量分窗口 flux.window(10)Flux<Flux<T>>
window(Duration) 按时间分窗口 flux.window(Duration.ofSeconds(1))
windowUntil(Predicate) 分窗口直到条件满足 flux.windowUntil(i -> i % 10 == 0)

分组

API 说明 示例
groupBy(Function) 按 key 分组 flux.groupBy(User::getType)
1
2
3
4
5
6
7
// groupBy 示例
flux.groupBy(user -> user.getType())
.flatMap(group -> group
.collectList()
.map(list -> group.key() + ": " + list.size())
)
.subscribe(System.out::println);

九、聚合操作符

API 说明 返回类型
count() 计数 Mono<Long>
reduce(BiFunction) 归约 Mono<T>
reduce(T, BiFunction) 带初始值归约 Mono<T>
reduceWith(Supplier, BiFunction) 延迟初始值归约 Mono<T>
scan(BiFunction) 累积(发出中间结果) Flux<T>
scan(T, BiFunction) 带初始值累积 Flux<T>
collect(Collector) 收集到集合 Mono<R>
collectList() 收集为 List Mono<List<T>>
collectMap(Function) 收集为 Map Mono<Map<K, T>>
collectMap(Function, Function) 收集为 Map(指定 value) Mono<Map<K, V>>
collectMultimap(Function) 收集为 Multimap Mono<Map<K, Collection<T>>>
collectSortedList() 收集为排序 List Mono<List<T>>
all(Predicate) 是否全部满足 Mono<Boolean>
any(Predicate) 是否有满足的 Mono<Boolean>
hasElements() 是否有元素 Mono<Boolean>
hasElement(T) 是否包含元素 Mono<Boolean>
1
2
3
4
5
6
7
8
9
// reduce 示例
Flux.range(1, 5)
.reduce((a, b) -> a + b)
.subscribe(System.out::println); // 15

// scan 示例(发出中间结果)
Flux.range(1, 5)
.scan((a, b) -> a + b)
.subscribe(System.out::println); // 1, 3, 6, 10, 15

十、时间操作符

API 说明 示例
delayElements(Duration) 延迟每个元素 flux.delayElements(Duration.ofMillis(100))
delaySequence(Duration) 延迟整个序列 flux.delaySequence(Duration.ofSeconds(1))
delaySubscription(Duration) 延迟订阅 flux.delaySubscription(Duration.ofSeconds(1))
timeout(Duration) 超时 flux.timeout(Duration.ofSeconds(5))
timeout(Duration, Publisher) 超时后切换 flux.timeout(Duration.ofSeconds(5), backupFlux)
sample(Duration) 采样 flux.sample(Duration.ofSeconds(1))
sampleFirst(Duration) 采样第一个 flux.sampleFirst(Duration.ofSeconds(1))
throttleFirst(Duration) 节流(取第一个) flux.throttleFirst(Duration.ofSeconds(1))
throttleLast(Duration) 节流(取最后一个) flux.throttleLast(Duration.ofSeconds(1))
elapsed() 添加时间间隔 flux.elapsed()
timed() 添加时间信息 flux.timed()

十一、背压操作符

API 说明 示例
onBackpressureBuffer() 无限缓冲 flux.onBackpressureBuffer()
onBackpressureBuffer(int) 有限缓冲 flux.onBackpressureBuffer(100)
onBackpressureBuffer(int, Consumer) 缓冲满时回调 flux.onBackpressureBuffer(100, v -> log.warn("溢出"))
onBackpressureDrop() 丢弃 flux.onBackpressureDrop()
onBackpressureDrop(Consumer) 丢弃并回调 flux.onBackpressureDrop(v -> log.warn("丢弃: " + v))
onBackpressureLatest() 只保留最新 flux.onBackpressureLatest()
onBackpressureError() 抛出异常 flux.onBackpressureError()
limitRate(int) 限制请求速率 flux.limitRate(100)
limitRequest(long) 限制总请求数 flux.limitRequest(1000)

十二、条件操作符

API 说明 示例
defaultIfEmpty(T) 空时返回默认值 flux.defaultIfEmpty(defaultValue)
switchIfEmpty(Publisher) 空时切换到另一个流 flux.switchIfEmpty(backupFlux)
repeatWhen(Function) 条件重复 flux.repeatWhen(companion -> companion.take(3))
repeat() 无限重复 flux.repeat()
repeat(long) 重复 n 次 flux.repeat(3)

十三、调试操作符

API 说明 示例
log() 打印所有信号 flux.log()
log(String) 带前缀打印 flux.log("MyFlux")
log(String, Level) 指定日志级别 flux.log("MyFlux", Level.FINE)
checkpoint() 添加检查点(调试用) flux.checkpoint("after filter")
checkpoint(String) 带描述的检查点 flux.checkpoint("处理后")
1
2
3
4
5
6
7
8
// 调试示例
Flux.range(1, 5)
.log("源数据")
.filter(i -> i > 2)
.log("过滤后")
.map(i -> i * 2)
.log("转换后")
.subscribe();

十四、上下文操作符

API 说明 示例
contextWrite(Context) 写入上下文 flux.contextWrite(Context.of("key", "value"))
contextWrite(Function) 修改上下文 flux.contextWrite(ctx -> ctx.put("key", "value"))
deferContextual(Function) 读取上下文创建流 见下方示例
transformDeferredContextual(BiFunction) 转换时读取上下文 见下方示例
1
2
3
4
5
6
7
// Context 示例 - 传递请求 ID
Mono.deferContextual(ctx -> {
String requestId = ctx.get("requestId");
return callService(requestId);
})
.contextWrite(Context.of("requestId", UUID.randomUUID().toString()))
.subscribe();

十五、Mono 特有操作符

API 说明 示例
block() 阻塞获取结果 mono.block()
block(Duration) 带超时阻塞 mono.block(Duration.ofSeconds(5))
toFuture() 转为 CompletableFuture mono.toFuture()
cache() 缓存结果 mono.cache()
cache(Duration) 带过期时间缓存 mono.cache(Duration.ofMinutes(5))
flux() 转为 Flux mono.flux()
zipWhen(Function) 与依赖结果组合 mono.zipWhen(v -> fetchRelated(v))
delayUntil(Function) 延迟直到另一个完成 mono.delayUntil(v -> saveAsync(v))

十六、Sinks(手动发送数据)

Sinks.Many(多值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 单播 - 只能有一个订阅者
Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer();

// 多播 - 多个订阅者
Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer();

// 重放 - 新订阅者可收到历史数据
Sinks.Many<String> replay = Sinks.many().replay().all();
Sinks.Many<String> replayLimit = Sinks.many().replay().limit(10);
Sinks.Many<String> replayLatest = Sinks.many().replay().latest();

// 发送数据
sink.tryEmitNext("data");
sink.emitNext("data", Sinks.EmitFailureHandler.FAIL_FAST);
sink.tryEmitComplete();
sink.tryEmitError(new Exception());

// 作为 Flux 使用
Flux<String> flux = sink.asFlux();

Sinks.One(单值)

1
2
3
Sinks.One<String> one = Sinks.one();
one.tryEmitValue("result");
Mono<String> mono = one.asMono();

Sinks.Empty(无值)

1
2
3
Sinks.Empty<Void> empty = Sinks.empty();
empty.tryEmitEmpty();
Mono<Void> mono = empty.asMono();

相关链接

  • Project Reactor 实现
  • 源码解析
  • WebFlux - Reactor核心API

Reactor API参考
https://zmmmmy.github.io/2026/01/10/Reactor API参考/
作者
ZhiMy
发布于
2026年1月10日
许可协议