ForkJoinPool
ℹ️ 信息: 衍生来源
本笔记衍生自 线程池类型-WorkStealingPool
概述
ForkJoinPool 是 Java 7 引入的特殊线程池,专为分治算法和并行计算设计。它是 ExecutorService 的实现,但采用了完全不同的任务调度策略。
1 2 3 4 5 6 7
| ┌────────────────────────────────────────────────────────────────┐ │ ForkJoinPool │ ├────────────────────────────────────────────────────────────────┤ │ 核心思想: Fork(拆分) + Join(合并) │ │ 调度策略: Work-Stealing(工作窃取) │ │ 适用场景: 递归任务、并行计算、分治算法 │ └────────────────────────────────────────────────────────────────┘
|
核心概念
Fork 与 Join
| 操作 |
说明 |
方法 |
| Fork |
将大任务拆分成小任务,异步执行 |
task.fork() |
| Join |
等待子任务完成,获取结果 |
task.join() |
分治思想图解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ┌─────────────┐ │ 大任务 │ └──────┬──────┘ │ fork ┌────────────┼────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 子任务1 │ │ 子任务2 │ │ 子任务3 │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ 结果1 │ │ 结果2 │ │ 结果3 │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └────────────┼────────────┘ │ join ┌─────┴─────┐ │ 合并结果 │ └───────────┘
|
工作窃取算法(Work-Stealing)
原理
每个工作线程维护一个双端队列(Deque):
- 自己的任务:从队列头部取(LIFO)
- 窃取他人任务:从队列尾部取(FIFO)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ┌─────────────────────────────────────────────────────────────────┐ │ ForkJoinPool │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Worker-1 │ │ Worker-2 │ │ Worker-3 │ │ │ ├─────────────┤ ├─────────────┤ ├─────────────┤ │ │ │ 头 [T1] │ │ 头 [T4] │ │ 头 [空闲] │ │ │ │ [T2] │ │ [T5] │ │ │ │ │ │ [T3] │ │ │ │ │ │ │ │ 尾 │ │ 尾 │ │ 尾 ↑ │ │ │ └─────────────┘ └─────────────┘ └──────┼──────┘ │ │ │ │ │ │ │ │ └──────────────────┘ │ │ │ Worker-3 从 Worker-2 尾部窃取 T5 │ └─────────────────────────────────────────────────────────────────┘
|
为什么从尾部窃取?
| 原因 |
说明 |
| 减少竞争 |
工作线程从头部取,窃取从尾部,避免锁争用 |
| 任务特性 |
尾部通常是较大的任务(刚 fork 出来的),窃取后可继续拆分 |
| 局部性 |
头部任务是最近提交的,缓存友好 |
核心类结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ┌───────────────────────────────────────┐ │ ForkJoinTask<V> │ 抽象基类 │ ├── fork() 异步执行 │ │ ├── join() 等待结果 │ │ └── invoke() 同步执行 │ └───────────────┬───────────────────────┘ │ ┌───────┴───────┐ ▼ ▼ ┌───────────────┐ ┌─────────────────┐ │RecursiveTask │ │ RecursiveAction │ │ <V> │ │ │ ├───────────────┤ ├─────────────────┤ │ 有返回值 │ │ 无返回值 │ │ compute()→V │ │ compute()→void │ └───────────────┘ └─────────────────┘
|
使用示例
1. RecursiveTask:有返回值(数组求和)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class SumTask extends RecursiveTask<Long> { private static final int THRESHOLD = 1000; private final long[] array; private final int start; private final int end;
public SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } int mid = start + length / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end); leftTask.fork(); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; } }
public static void main(String[] args) { long[] array = new long[10000000]; Arrays.fill(array, 1); ForkJoinPool pool = ForkJoinPool.commonPool(); SumTask task = new SumTask(array, 0, array.length); long result = pool.invoke(task); System.out.println("Sum: " + result); }
|
2. RecursiveAction:无返回值(并行排序)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class ParallelMergeSort extends RecursiveAction { private static final int THRESHOLD = 1024; private final int[] array; private final int start; private final int end;
public ParallelMergeSort(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected void compute() { if (end - start <= THRESHOLD) { Arrays.sort(array, start, end); return; } int mid = (start + end) / 2; ParallelMergeSort left = new ParallelMergeSort(array, start, mid); ParallelMergeSort right = new ParallelMergeSort(array, mid, end); invokeAll(left, right); merge(array, start, mid, end); } private void merge(int[] array, int start, int mid, int end) { } }
|
ForkJoinPool 创建方式
1. 公共池(推荐)
1 2 3 4 5
| ForkJoinPool commonPool = ForkJoinPool.commonPool();
Long result = commonPool.invoke(task);
|
2. 自定义池
1 2 3 4 5 6 7 8 9 10
| ForkJoinPool customPool = new ForkJoinPool(4);
ForkJoinPool pool = new ForkJoinPool( 4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true );
|
3. 通过 Executors 工厂
1 2
| ExecutorService executor = Executors.newWorkStealingPool();
|
提交任务的方式
| 方法 |
说明 |
返回值 |
invoke(task) |
同步执行,阻塞直到完成 |
任务结果 |
execute(task) |
异步执行,不等待结果 |
void |
submit(task) |
异步执行,返回 Future |
ForkJoinTask<V> |
1 2 3 4 5 6 7 8 9 10 11 12
| ForkJoinPool pool = ForkJoinPool.commonPool(); SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);
ForkJoinTask<Long> future = pool.submit(task); Long result = future.get();
pool.execute(task);
|
与普通线程池对比
| 特性 |
ThreadPoolExecutor |
ForkJoinPool |
| 任务队列 |
共享队列(单一) |
每线程独立双端队列 |
| 调度策略 |
先进先出 |
工作窃取 |
| 适用任务 |
独立任务 |
可拆分的递归任务 |
| 负载均衡 |
依赖队列调度 |
工作窃取自动均衡 |
| CPU 利用 |
可能有线程空闲 |
更高效(减少空闲) |
| 任务粒度 |
任意 |
适合可拆分任务 |
实际应用场景
1. Java 8 并行流
1 2 3 4 5 6
| List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int sum = numbers.parallelStream() .mapToInt(Integer::intValue) .sum();
|
2. CompletableFuture 默认执行器
1 2 3 4
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Hello"; });
|
3. 大数据处理
1 2 3
| Arrays.parallelSort(largeArray); Arrays.parallelSetAll(array, i -> i * 2);
|
注意事项
1. 阈值设置
1 2 3 4
|
private static final int THRESHOLD = 1000;
|
2. 避免阻塞操作
1 2 3 4 5 6 7 8 9
| protected Long compute() { String data = httpClient.get(url); return process(data); }
|
3. 公共池的风险
1 2 3 4 5 6 7 8 9
|
ForkJoinPool customPool = new ForkJoinPool(8); customPool.submit(() -> { myParallelStream.forEach(...); }).get();
|
相关链接
- 上一篇:03-线程池类型
- 线程池原理:02-线程池原理
- 线程池简介:01-线程池简介