ForkJoinPool

ForkJoinPool

ℹ️ 信息: 衍生来源
本笔记衍生自 线程池类型-WorkStealingPool

概述

ForkJoinPool 是 Java 7 引入的特殊线程池,专为分治算法并行计算设计。它是 ExecutorService 的实现,但采用了完全不同的任务调度策略。

1
2
3
4
5
6
7
┌────────────────────────────────────────────────────────────────┐
│ ForkJoinPool │
├────────────────────────────────────────────────────────────────┤
│ 核心思想: Fork(拆分) + Join(合并) │
│ 调度策略: Work-Stealing(工作窃取) │
│ 适用场景: 递归任务、并行计算、分治算法 │
└────────────────────────────────────────────────────────────────┘

核心概念

Fork 与 Join

操作 说明 方法
Fork 将大任务拆分成小任务,异步执行 task.fork()
Join 等待子任务完成,获取结果 task.join()

分治思想图解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
            ┌─────────────┐
│ 大任务 │
└──────┬──────┘
fork
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 子任务1 │ │ 子任务2 │ │ 子任务3
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 结果1 │ │ 结果2 │ │ 结果3
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────┼────────────┘
join
┌─────┴─────┐
│ 合并结果 │
└───────────┘

工作窃取算法(Work-Stealing)

原理

每个工作线程维护一个双端队列(Deque)

  • 自己的任务:从队列头部取(LIFO)
  • 窃取他人任务:从队列尾部取(FIFO)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────────────┐
│ ForkJoinPool │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Worker-1 │ │ Worker-2 │ │ Worker-3 │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ 头 [T1] │ │ 头 [T4] │ │ 头 [空闲] │ │
│ │ [T2] │ │ [T5] │ │ │ │
│ │ [T3] │ │ │ │ │ │
│ │ 尾 │ │ 尾 │ │ 尾 ↑ │ │
│ └─────────────┘ └─────────────┘ └──────┼──────┘ │
│ │ │ │ │
│ │ └──────────────────┘ │
│ │ Worker-3 从 Worker-2 尾部窃取 T5 │
└─────────────────────────────────────────────────────────────────┘

为什么从尾部窃取?

原因 说明
减少竞争 工作线程从头部取,窃取从尾部,避免锁争用
任务特性 尾部通常是较大的任务(刚 fork 出来的),窃取后可继续拆分
局部性 头部任务是最近提交的,缓存友好

核心类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌───────────────────────────────────────┐
│ ForkJoinTask<V> │ 抽象基类
│ ├── fork() 异步执行 │
│ ├── join() 等待结果 │
│ └── invoke() 同步执行 │
└───────────────┬───────────────────────┘

┌───────┴───────┐
▼ ▼
┌───────────────┐ ┌─────────────────┐
│RecursiveTask │ │ RecursiveAction │
│ <V> │ │ │
├───────────────┤ ├─────────────────┤
│ 有返回值 │ │ 无返回值 │
compute()→V │ │ compute()→void │
└───────────────┘ └─────────────────┘

使用示例

1. RecursiveTask:有返回值(数组求和)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 阈值
private final long[] array;
private final int start;
private final int end;

public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;

// 任务足够小,直接计算
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}

// 拆分任务
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);

// 异步执行左半部分
leftTask.fork();

// 同步执行右半部分(当前线程)
Long rightResult = rightTask.compute();

// 等待左半部分完成并合并结果
Long leftResult = leftTask.join();

return leftResult + rightResult;
}
}

// 使用
public static void main(String[] args) {
long[] array = new long[10000000];
Arrays.fill(array, 1);

ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);

long result = pool.invoke(task); // 同步执行
System.out.println("Sum: " + result); // 输出: Sum: 10000000
}

2. RecursiveAction:无返回值(并行排序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ParallelMergeSort extends RecursiveAction {
private static final int THRESHOLD = 1024;
private final int[] array;
private final int start;
private final int end;

public ParallelMergeSort(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 小数组直接用 Arrays.sort
Arrays.sort(array, start, end);
return;
}

int mid = (start + end) / 2;

// 并行排序两半
ParallelMergeSort left = new ParallelMergeSort(array, start, mid);
ParallelMergeSort right = new ParallelMergeSort(array, mid, end);

invokeAll(left, right); // 并行执行两个任务

// 合并已排序的两半
merge(array, start, mid, end);
}

private void merge(int[] array, int start, int mid, int end) {
// 合并逻辑...
}
}

ForkJoinPool 创建方式

1. 公共池(推荐)

1
2
3
4
5
// 获取公共池(JVM 共享,线程数 = CPU核心数 - 1)
ForkJoinPool commonPool = ForkJoinPool.commonPool();

// 直接使用
Long result = commonPool.invoke(task);

2. 自定义池

1
2
3
4
5
6
7
8
9
10
// 自定义线程数
ForkJoinPool customPool = new ForkJoinPool(4);

// 完整构造函数
ForkJoinPool pool = new ForkJoinPool(
4, // 并行度
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // 线程工厂
null, // 异常处理器
true // asyncMode(true=FIFO,false=LIFO)
);

3. 通过 Executors 工厂

1
2
// Java 8+ WorkStealingPool 底层就是 ForkJoinPool
ExecutorService executor = Executors.newWorkStealingPool();

提交任务的方式

方法 说明 返回值
invoke(task) 同步执行,阻塞直到完成 任务结果
execute(task) 异步执行,不等待结果 void
submit(task) 异步执行,返回 Future ForkJoinTask<V>
1
2
3
4
5
6
7
8
9
10
11
12
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);

// 方式1:同步
Long result = pool.invoke(task);

// 方式2:异步提交
ForkJoinTask<Long> future = pool.submit(task);
Long result = future.get(); // 阻塞等待

// 方式3:异步执行(不关心结果)
pool.execute(task);

与普通线程池对比

特性 ThreadPoolExecutor ForkJoinPool
任务队列 共享队列(单一) 每线程独立双端队列
调度策略 先进先出 工作窃取
适用任务 独立任务 可拆分的递归任务
负载均衡 依赖队列调度 工作窃取自动均衡
CPU 利用 可能有线程空闲 更高效(减少空闲)
任务粒度 任意 适合可拆分任务

实际应用场景

1. Java 8 并行流

1
2
3
4
5
6
// parallelStream() 底层使用 ForkJoinPool.commonPool()
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

int sum = numbers.parallelStream()
.mapToInt(Integer::intValue)
.sum();

2. CompletableFuture 默认执行器

1
2
3
4
// 默认使用 ForkJoinPool.commonPool()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});

3. 大数据处理

1
2
3
// 并行处理大量数据
Arrays.parallelSort(largeArray);
Arrays.parallelSetAll(array, i -> i * 2);

注意事项

1. 阈值设置

1
2
3
4
// 阈值太小:拆分开销 > 并行收益
// 阈值太大:无法充分利用多核
// 经验值:根据任务复杂度和数据量测试确定
private static final int THRESHOLD = 1000; // 需要根据实际情况调整

2. 避免阻塞操作

1
2
3
4
5
6
7
8
9
// ❌ 错误:在 ForkJoinPool 中执行阻塞 IO
protected Long compute() {
// 不要在这里做 IO 操作!
String data = httpClient.get(url); // 阻塞,浪费线程
return process(data);
}

// ✅ 正确:ForkJoinPool 只用于 CPU 计算
// IO 操作使用其他线程池

3. 公共池的风险

1
2
3
4
5
6
7
8
9
// 所有并行流共享 commonPool
// 如果某个任务阻塞,会影响其他任务

// 解决方案:为关键任务创建独立的 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8);
customPool.submit(() -> {
// 独立执行,不影响 commonPool
myParallelStream.forEach(...);
}).get();

相关链接

  • 上一篇:03-线程池类型
  • 线程池原理:02-线程池原理
  • 线程池简介:01-线程池简介

ForkJoinPool
https://zmmmmy.github.io/2026/01/10/ForkJoinPool/
作者
ZhiMy
发布于
2026年1月10日
许可协议