Reactor API 参考
Project Reactor 提供了丰富的操作符,本文按功能分类整理。
一、创建操作符
Mono 创建
| API |
说明 |
示例 |
Mono.just(T) |
创建包含单个值的 Mono |
Mono.just("hello") |
Mono.empty() |
创建空的 Mono |
Mono.empty() |
Mono.error(Throwable) |
创建错误 Mono |
Mono.error(new RuntimeException()) |
Mono.never() |
永不发出信号的 Mono |
Mono.never() |
Mono.defer(Supplier) |
延迟创建,每次订阅都执行 |
Mono.defer(() -> Mono.just(getValue())) |
Mono.fromCallable(Callable) |
从 Callable 创建 |
Mono.fromCallable(() -> compute()) |
Mono.fromSupplier(Supplier) |
从 Supplier 创建 |
Mono.fromSupplier(() -> getValue()) |
Mono.fromFuture(CompletableFuture) |
从 Future 创建 |
Mono.fromFuture(future) |
Mono.fromRunnable(Runnable) |
从 Runnable 创建(无返回值) |
Mono.fromRunnable(() -> doSomething()) |
Mono.delay(Duration) |
延迟后发出 0L |
Mono.delay(Duration.ofSeconds(1)) |
Mono.create(Consumer<MonoSink>) |
编程式创建 |
见下方示例 |
1 2 3 4 5 6 7 8 9
| Mono.create(sink -> { String result = doAsyncWork(); if (result != null) { sink.success(result); } else { sink.error(new Exception("失败")); } });
|
Flux 创建
| API |
说明 |
示例 |
Flux.just(T...) |
从多个值创建 |
Flux.just(1, 2, 3) |
Flux.fromIterable(Iterable) |
从集合创建 |
Flux.fromIterable(list) |
Flux.fromArray(T[]) |
从数组创建 |
Flux.fromArray(array) |
Flux.fromStream(Stream) |
从 Stream 创建 |
Flux.fromStream(stream) |
Flux.range(int, int) |
创建整数序列 |
Flux.range(1, 10) |
Flux.interval(Duration) |
定时发出递增数字 |
Flux.interval(Duration.ofSeconds(1)) |
Flux.empty() |
创建空 Flux |
Flux.empty() |
Flux.error(Throwable) |
创建错误 Flux |
Flux.error(new Exception()) |
Flux.never() |
永不发出信号 |
Flux.never() |
Flux.defer(Supplier) |
延迟创建 |
Flux.defer(() -> Flux.just(getData())) |
Flux.generate(Consumer) |
同步生成(一次一个) |
见下方示例 |
Flux.create(Consumer) |
异步生成(可多个) |
见下方示例 |
Flux.push(Consumer) |
单线程异步生成 |
类似 create |
Flux.using(Callable, Function, Consumer) |
资源管理 |
见下方示例 |
Flux.concat(Publisher...) |
顺序连接多个流 |
Flux.concat(flux1, flux2) |
Flux.merge(Publisher...) |
并行合并多个流 |
Flux.merge(flux1, flux2) |
Flux.zip(Publisher...) |
组合多个流 |
Flux.zip(flux1, flux2) |
Flux.combineLatest(Publisher...) |
组合最新值 |
见下方示例 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Flux.generate( () -> 0, (state, sink) -> { sink.next("值: " + state); if (state == 10) sink.complete(); return state + 1; } );
Flux.create(sink -> { eventSource.onData(data -> sink.next(data)); eventSource.onError(e -> sink.error(e)); eventSource.onComplete(() -> sink.complete()); });
Flux.using( () -> new FileInputStream("file.txt"), inputStream -> Flux.just(readData(inputStream)), inputStream -> inputStream.close() );
|
二、转换操作符
基本转换
| API |
说明 |
示例 |
map(Function) |
同步转换每个元素 |
flux.map(i -> i * 2) |
flatMap(Function) |
异步转换,结果合并(无序) |
flux.flatMap(id -> fetchUser(id)) |
flatMapSequential(Function) |
异步转换,保持顺序 |
flux.flatMapSequential(id -> fetchUser(id)) |
concatMap(Function) |
异步转换,顺序执行 |
flux.concatMap(id -> fetchUser(id)) |
switchMap(Function) |
切换到新流,取消旧流 |
flux.switchMap(id -> fetchUser(id)) |
cast(Class) |
类型转换 |
flux.cast(String.class) |
ofType(Class) |
过滤并转换类型 |
flux.ofType(String.class) |
index() |
添加索引 |
flux.index() → Tuple2<Long, T> |
timestamp() |
添加时间戳 |
flux.timestamp() → Tuple2<Long, T> |
elapsed() |
添加时间间隔 |
flux.elapsed() |
flatMap 变体对比
1 2 3 4
| flatMap: 并行执行,结果无序 flatMapSequential: 并行执行,结果有序 concatMap: 顺序执行,结果有序 switchMap: 切换新流,取消旧流
|
1 2 3 4 5 6 7 8
| Flux.range(1, 3) .flatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100 - i * 30))) .subscribe(System.out::println);
Flux.range(1, 3) .concatMap(i -> Mono.just(i).delayElement(Duration.ofMillis(100))) .subscribe(System.out::println);
|
三、过滤操作符
| API |
说明 |
示例 |
filter(Predicate) |
过滤元素 |
flux.filter(i -> i > 5) |
filterWhen(Function) |
异步过滤 |
flux.filterWhen(i -> checkAsync(i)) |
distinct() |
去重 |
flux.distinct() |
distinctUntilChanged() |
连续去重 |
flux.distinctUntilChanged() |
take(long) |
取前 n 个 |
flux.take(5) |
takeLast(int) |
取最后 n 个 |
flux.takeLast(3) |
takeUntil(Predicate) |
取直到条件满足 |
flux.takeUntil(i -> i > 10) |
takeWhile(Predicate) |
取当条件满足时 |
flux.takeWhile(i -> i < 10) |
skip(long) |
跳过前 n 个 |
flux.skip(5) |
skipLast(int) |
跳过最后 n 个 |
flux.skipLast(3) |
skipUntil(Predicate) |
跳过直到条件满足 |
flux.skipUntil(i -> i > 5) |
skipWhile(Predicate) |
跳过当条件满足时 |
flux.skipWhile(i -> i < 5) |
first() |
取第一个(已废弃) |
使用 next() |
next() |
取第一个,返回 Mono |
flux.next() |
last() |
取最后一个 |
flux.last() |
last(T) |
取最后一个,有默认值 |
flux.last(defaultValue) |
single() |
期望只有一个元素 |
flux.single() |
singleOrEmpty() |
期望 0 或 1 个元素 |
flux.singleOrEmpty() |
elementAt(int) |
取指定位置元素 |
flux.elementAt(5) |
ignoreElements() |
忽略所有元素 |
flux.ignoreElements() |
四、组合操作符
| API |
说明 |
示例 |
concat(Publisher...) |
顺序连接 |
Flux.concat(flux1, flux2) |
concatWith(Publisher) |
与另一个流连接 |
flux1.concatWith(flux2) |
merge(Publisher...) |
并行合并 |
Flux.merge(flux1, flux2) |
mergeWith(Publisher) |
与另一个流合并 |
flux1.mergeWith(flux2) |
mergeSequential(Publisher...) |
并行执行,顺序输出 |
Flux.mergeSequential(flux1, flux2) |
zip(Publisher...) |
一对一组合 |
Flux.zip(flux1, flux2) |
zipWith(Publisher) |
与另一个流组合 |
flux1.zipWith(flux2) |
zipWith(Publisher, BiFunction) |
组合并转换 |
flux1.zipWith(flux2, (a, b) -> a + b) |
combineLatest(Publisher...) |
组合最新值 |
Flux.combineLatest(flux1, flux2, combiner) |
startWith(T...) |
在开头添加元素 |
flux.startWith(0) |
and(Publisher) |
等待两个都完成 |
mono1.and(mono2) |
when(Publisher...) |
等待所有完成 |
Mono.when(mono1, mono2) |
then() |
忽略元素,只关心完成 |
flux.then() |
thenMany(Publisher) |
完成后切换到另一个流 |
mono.thenMany(flux) |
thenReturn(T) |
完成后返回固定值 |
mono.thenReturn("done") |
组合操作符对比
1 2 3
| concat: A完成 → B开始,顺序执行 merge: A和B同时执行,谁快谁先输出 zip: A和B一对一配对,(a1,b1), (a2,b2)...
|
1 2 3 4 5 6 7
| Flux<String> names = Flux.just("张三", "李四"); Flux<Integer> ages = Flux.just(25, 30);
Flux.zip(names, ages, (name, age) -> name + ": " + age) .subscribe(System.out::println);
|
五、错误处理操作符
| API |
说明 |
示例 |
onErrorReturn(T) |
错误时返回默认值 |
flux.onErrorReturn(defaultValue) |
onErrorReturn(Predicate, T) |
特定错误返回默认值 |
flux.onErrorReturn(e -> e instanceof IOException, default) |
onErrorResume(Function) |
错误时切换到备用流 |
flux.onErrorResume(e -> backupFlux) |
onErrorMap(Function) |
转换错误类型 |
flux.onErrorMap(e -> new CustomException(e)) |
onErrorContinue(BiConsumer) |
错误时跳过继续 |
flux.onErrorContinue((e, v) -> log.error(e)) |
onErrorStop() |
错误时停止 |
flux.onErrorStop() |
doOnError(Consumer) |
错误时执行副作用 |
flux.doOnError(e -> log.error(e)) |
retry() |
无限重试 |
flux.retry() |
retry(long) |
重试 n 次 |
flux.retry(3) |
retryWhen(Retry) |
自定义重试策略 |
见下方示例 |
1 2 3 4 5 6 7 8 9 10 11 12
| flux.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .maxBackoff(Duration.ofSeconds(10)) .filter(e -> e instanceof IOException) .onRetryExhaustedThrow((spec, signal) -> signal.failure()) );
flux.retryWhen(Retry.backoff(5, Duration.ofMillis(100)));
flux.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)));
|
六、副作用操作符(doOn 系列)
| API |
说明 |
触发时机 |
doOnNext(Consumer) |
每个元素 |
onNext 信号 |
doOnError(Consumer) |
发生错误 |
onError 信号 |
doOnComplete(Runnable) |
完成时 |
onComplete 信号 |
doOnSubscribe(Consumer) |
订阅时 |
onSubscribe 信号 |
doOnRequest(LongConsumer) |
请求数据时 |
request 信号 |
doOnCancel(Runnable) |
取消时 |
cancel 信号 |
doOnTerminate(Runnable) |
终止时(完成或错误) |
onComplete 或 onError |
doAfterTerminate(Runnable) |
终止后 |
终止信号发送后 |
doOnEach(Consumer<Signal>) |
每个信号 |
所有信号 |
doFirst(Runnable) |
订阅前 |
订阅链建立前 |
doFinally(Consumer<SignalType>) |
最终执行 |
任何终止情况 |
1 2 3 4 5 6 7 8 9 10 11 12 13
| flux .doFirst(() -> log.info("开始")) .doOnSubscribe(s -> log.info("订阅")) .doOnRequest(n -> log.info("请求: " + n)) .doOnNext(v -> log.info("收到: " + v)) .doOnError(e -> log.error("错误", e)) .doOnComplete(() -> log.info("完成")) .doOnCancel(() -> log.info("取消")) .doOnTerminate(() -> log.info("终止")) .doAfterTerminate(() -> log.info("终止后")) .doFinally(type -> log.info("最终: " + type)) .subscribe();
|
七、调度器操作符
| API |
说明 |
示例 |
subscribeOn(Scheduler) |
指定订阅线程 |
flux.subscribeOn(Schedulers.boundedElastic()) |
publishOn(Scheduler) |
指定发布线程 |
flux.publishOn(Schedulers.parallel()) |
调度器类型
| Scheduler |
说明 |
适用场景 |
Schedulers.immediate() |
当前线程 |
测试、简单场景 |
Schedulers.single() |
单线程 |
顺序执行 |
Schedulers.parallel() |
并行线程池(CPU核心数) |
CPU 密集型 |
Schedulers.boundedElastic() |
弹性线程池(有上限) |
I/O 密集型、阻塞操作 |
Schedulers.fromExecutor(Executor) |
自定义线程池 |
特殊需求 |
1 2 3 4 5 6 7
| Flux.range(1, 3) .doOnNext(i -> log.info("生成: " + i)) .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.parallel()) .doOnNext(i -> log.info("处理: " + i)) .subscribe();
|
八、缓冲与窗口操作符
缓冲(收集为 List)
| API |
说明 |
示例 |
buffer() |
收集所有元素 |
flux.buffer() → Flux<List<T>> |
buffer(int) |
按数量缓冲 |
flux.buffer(10) |
buffer(Duration) |
按时间缓冲 |
flux.buffer(Duration.ofSeconds(1)) |
buffer(int, int) |
滑动窗口缓冲 |
flux.buffer(3, 1) |
bufferUntil(Predicate) |
缓冲直到条件满足 |
flux.bufferUntil(i -> i % 10 == 0) |
bufferWhile(Predicate) |
缓冲当条件满足时 |
flux.bufferWhile(i -> i < 10) |
bufferTimeout(int, Duration) |
数量或时间触发 |
flux.bufferTimeout(100, Duration.ofSeconds(1)) |
窗口(收集为 Flux)
| API |
说明 |
示例 |
window(int) |
按数量分窗口 |
flux.window(10) → Flux<Flux<T>> |
window(Duration) |
按时间分窗口 |
flux.window(Duration.ofSeconds(1)) |
windowUntil(Predicate) |
分窗口直到条件满足 |
flux.windowUntil(i -> i % 10 == 0) |
分组
| API |
说明 |
示例 |
groupBy(Function) |
按 key 分组 |
flux.groupBy(User::getType) |
1 2 3 4 5 6 7
| flux.groupBy(user -> user.getType()) .flatMap(group -> group .collectList() .map(list -> group.key() + ": " + list.size()) ) .subscribe(System.out::println);
|
九、聚合操作符
| API |
说明 |
返回类型 |
count() |
计数 |
Mono<Long> |
reduce(BiFunction) |
归约 |
Mono<T> |
reduce(T, BiFunction) |
带初始值归约 |
Mono<T> |
reduceWith(Supplier, BiFunction) |
延迟初始值归约 |
Mono<T> |
scan(BiFunction) |
累积(发出中间结果) |
Flux<T> |
scan(T, BiFunction) |
带初始值累积 |
Flux<T> |
collect(Collector) |
收集到集合 |
Mono<R> |
collectList() |
收集为 List |
Mono<List<T>> |
collectMap(Function) |
收集为 Map |
Mono<Map<K, T>> |
collectMap(Function, Function) |
收集为 Map(指定 value) |
Mono<Map<K, V>> |
collectMultimap(Function) |
收集为 Multimap |
Mono<Map<K, Collection<T>>> |
collectSortedList() |
收集为排序 List |
Mono<List<T>> |
all(Predicate) |
是否全部满足 |
Mono<Boolean> |
any(Predicate) |
是否有满足的 |
Mono<Boolean> |
hasElements() |
是否有元素 |
Mono<Boolean> |
hasElement(T) |
是否包含元素 |
Mono<Boolean> |
1 2 3 4 5 6 7 8 9
| Flux.range(1, 5) .reduce((a, b) -> a + b) .subscribe(System.out::println);
Flux.range(1, 5) .scan((a, b) -> a + b) .subscribe(System.out::println);
|
十、时间操作符
| API |
说明 |
示例 |
delayElements(Duration) |
延迟每个元素 |
flux.delayElements(Duration.ofMillis(100)) |
delaySequence(Duration) |
延迟整个序列 |
flux.delaySequence(Duration.ofSeconds(1)) |
delaySubscription(Duration) |
延迟订阅 |
flux.delaySubscription(Duration.ofSeconds(1)) |
timeout(Duration) |
超时 |
flux.timeout(Duration.ofSeconds(5)) |
timeout(Duration, Publisher) |
超时后切换 |
flux.timeout(Duration.ofSeconds(5), backupFlux) |
sample(Duration) |
采样 |
flux.sample(Duration.ofSeconds(1)) |
sampleFirst(Duration) |
采样第一个 |
flux.sampleFirst(Duration.ofSeconds(1)) |
throttleFirst(Duration) |
节流(取第一个) |
flux.throttleFirst(Duration.ofSeconds(1)) |
throttleLast(Duration) |
节流(取最后一个) |
flux.throttleLast(Duration.ofSeconds(1)) |
elapsed() |
添加时间间隔 |
flux.elapsed() |
timed() |
添加时间信息 |
flux.timed() |
十一、背压操作符
| API |
说明 |
示例 |
onBackpressureBuffer() |
无限缓冲 |
flux.onBackpressureBuffer() |
onBackpressureBuffer(int) |
有限缓冲 |
flux.onBackpressureBuffer(100) |
onBackpressureBuffer(int, Consumer) |
缓冲满时回调 |
flux.onBackpressureBuffer(100, v -> log.warn("溢出")) |
onBackpressureDrop() |
丢弃 |
flux.onBackpressureDrop() |
onBackpressureDrop(Consumer) |
丢弃并回调 |
flux.onBackpressureDrop(v -> log.warn("丢弃: " + v)) |
onBackpressureLatest() |
只保留最新 |
flux.onBackpressureLatest() |
onBackpressureError() |
抛出异常 |
flux.onBackpressureError() |
limitRate(int) |
限制请求速率 |
flux.limitRate(100) |
limitRequest(long) |
限制总请求数 |
flux.limitRequest(1000) |
十二、条件操作符
| API |
说明 |
示例 |
defaultIfEmpty(T) |
空时返回默认值 |
flux.defaultIfEmpty(defaultValue) |
switchIfEmpty(Publisher) |
空时切换到另一个流 |
flux.switchIfEmpty(backupFlux) |
repeatWhen(Function) |
条件重复 |
flux.repeatWhen(companion -> companion.take(3)) |
repeat() |
无限重复 |
flux.repeat() |
repeat(long) |
重复 n 次 |
flux.repeat(3) |
十三、调试操作符
| API |
说明 |
示例 |
log() |
打印所有信号 |
flux.log() |
log(String) |
带前缀打印 |
flux.log("MyFlux") |
log(String, Level) |
指定日志级别 |
flux.log("MyFlux", Level.FINE) |
checkpoint() |
添加检查点(调试用) |
flux.checkpoint("after filter") |
checkpoint(String) |
带描述的检查点 |
flux.checkpoint("处理后") |
1 2 3 4 5 6 7 8
| Flux.range(1, 5) .log("源数据") .filter(i -> i > 2) .log("过滤后") .map(i -> i * 2) .log("转换后") .subscribe();
|
十四、上下文操作符
| API |
说明 |
示例 |
contextWrite(Context) |
写入上下文 |
flux.contextWrite(Context.of("key", "value")) |
contextWrite(Function) |
修改上下文 |
flux.contextWrite(ctx -> ctx.put("key", "value")) |
deferContextual(Function) |
读取上下文创建流 |
见下方示例 |
transformDeferredContextual(BiFunction) |
转换时读取上下文 |
见下方示例 |
1 2 3 4 5 6 7
| Mono.deferContextual(ctx -> { String requestId = ctx.get("requestId"); return callService(requestId); }) .contextWrite(Context.of("requestId", UUID.randomUUID().toString())) .subscribe();
|
十五、Mono 特有操作符
| API |
说明 |
示例 |
block() |
阻塞获取结果 |
mono.block() |
block(Duration) |
带超时阻塞 |
mono.block(Duration.ofSeconds(5)) |
toFuture() |
转为 CompletableFuture |
mono.toFuture() |
cache() |
缓存结果 |
mono.cache() |
cache(Duration) |
带过期时间缓存 |
mono.cache(Duration.ofMinutes(5)) |
flux() |
转为 Flux |
mono.flux() |
zipWhen(Function) |
与依赖结果组合 |
mono.zipWhen(v -> fetchRelated(v)) |
delayUntil(Function) |
延迟直到另一个完成 |
mono.delayUntil(v -> saveAsync(v)) |
十六、Sinks(手动发送数据)
Sinks.Many(多值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Sinks.Many<String> unicast = Sinks.many().unicast().onBackpressureBuffer();
Sinks.Many<String> multicast = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<String> replay = Sinks.many().replay().all(); Sinks.Many<String> replayLimit = Sinks.many().replay().limit(10); Sinks.Many<String> replayLatest = Sinks.many().replay().latest();
sink.tryEmitNext("data"); sink.emitNext("data", Sinks.EmitFailureHandler.FAIL_FAST); sink.tryEmitComplete(); sink.tryEmitError(new Exception());
Flux<String> flux = sink.asFlux();
|
Sinks.One(单值)
1 2 3
| Sinks.One<String> one = Sinks.one(); one.tryEmitValue("result"); Mono<String> mono = one.asMono();
|
Sinks.Empty(无值)
1 2 3
| Sinks.Empty<Void> empty = Sinks.empty(); empty.tryEmitEmpty(); Mono<Void> mono = empty.asMono();
|
相关链接
- Project Reactor 实现
- 源码解析
- WebFlux - Reactor核心API