Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:
1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
在Java的Fork/Join框架中,使用两个类完成上述操作
1.ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:
a.RecursiveAction:用于没有返回结果的任务
b.RecursiveTask:用于有返回结果的任务
2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
public class {
static class ComputeTask extends RecursiveTask<Long> { private final static int THREAD_HOLD = 100; private int[] data; private int start; private int end;
public ComputeTask(int[] data,int start,int end) { this.data = data; this.start = start; this.end = end; }
protected Long compute() { Long sum = 0L;
if (end - start <= THREAD_HOLD) { for (int i = start; i < end; i++) { sum += data[i]; }
System.out.println(String.format("Compute %d ~ %d = %d",start,end,sum));
} else { int middle = (end + start) / 2; System.out.println(String.format("Split %d~%d ==> %d~%d,%d~%d",middle,middle + 1,end)); ComputeTask left = new ComputeTask(data,middle); ComputeTask right = new ComputeTask(data,end); invokeAll(left,right);
Long leftSum = left.join(); Long rightSum = right.join(); sum = leftSum + rightSum; System.out.println(String.format("Result = %d + %d ==> %d",leftSum,rightSum,sum)); }
return sum; } }
public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool();
int[] data = new int[1000]; fillData(data); Long sum = pool.invoke(new ComputeTask(data,0,data.length)); System.out.println("Fork/join sum:" + sum); }
private static void fillData(int[] data) { for (int i = 0; i < data.length; i++) { data[i] = i; } } }
|
上面的代码使用Fork/Join框架来计算一个元素个数为1000的数组的和。
执行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
Split 0~1000 ==> 0~500,501~1000 Split 501~1000 ==> 501~750,751~1000 Split 0~500 ==> 0~250,251~500 Split 501~750 ==> 501~625,626~750 Split 0~250 ==> 0~125,126~250 Compute 0 ~ 125 = 7750 Compute 126 ~ 250 = 23250 Result = 7750 + 23250 ==> 31000 Split 251~500 ==> 251~375,376~500 Compute 251 ~ 375 = 38750 Compute 376 ~ 500 = 54250 Result = 38750 + 54250 ==> 93000 Result = 31000 + 93000 ==> 124000 Split 751~1000 ==> 751~875,876~1000 Compute 751 ~ 875 = 100750 Compute 876 ~ 1000 = 116250 Result = 100750 + 116250 ==> 217000 Compute 626 ~ 750 = 85250 Compute 501 ~ 625 = 69750 Result = 69750 + 85250 ==> 155000 Result = 155000 + 217000 ==> 372000 Result = 124000 + 372000 ==> 496000 Fork/join sum:496000
|
这里要注意的是:在拆分任务时不要调用fork()方法将任务推给别的线程执行,而使用invokeAll()。
这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程
原因参考:https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000
Fork/Join实现介绍:https://blog.csdn.net/yinwenjie/article/details/71524140
原文:大专栏 ?Java7 ForkJoin框架详解