加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

使用ForkJoinPool来多线程的拆分任务,执行任务,合并结果。

发布时间:2020-12-15 05:28:54 所属栏目:Java 来源:网络整理
导读:ForkJoinPool 是jdk1.7 由Doug Lea 写的实现? ?递归调用任务拆分,合并,的线程池。 代码示例: package www.itbac.com; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframew

ForkJoinPool 是jdk1.7 由Doug Lea 写的实现? ?递归调用任务拆分,合并,的线程池。

代码示例:

package www.itbac.com;

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.concurrent.*;

/**
 * 并行调用http接口
 */
@Service
public class UserServiceForkJoin {
    // 本质是一个线程池,默认的线程数量:CPU的核数
    ForkJoinPool forkJoinPool = new ForkJoinPool(10,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true);
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws ExecutionException,InterruptedException {
        // 其他例子,查数据库的多个表数据,分多次查询
        // fork/join
        // forkJoinPool.submit()
        ArrayList<String> urls = new ArrayList<>();
        urls.add("http://www.itbac.com/userinfo-api/get?userId=" + userId);
        urls.add("http://www.itbac.com/integral-api/get?userId=" + userId);

        HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate,urls,urls.size() - 1);
        ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);

        JSONObject result = forkJoinTask.get();
        return result;
    }
}

// 自定义任务类, 继承递归任务。
class HttpJsonRequest extends RecursiveTask<JSONObject> {

    RestTemplate restTemplate;
    ArrayList<String> urls;
    int start;
    int end;

    HttpJsonRequest(RestTemplate restTemplate,ArrayList<String> urls,int start,int end) {
        this.restTemplate = restTemplate;
        this.urls = urls;
        this.start = start;
        this.end = end;
    }

    // 就是实际去执行的一个方法入口(任务拆分)
    @Override
    protected JSONObject compute() {
        int count = end - start; // 代表当前这个task需要处理多少数据
        // 自行根据业务场景去判断是否是大任务,是否需要拆分
        if (count == 0) {
            String url = urls.get(start);
            // TODO 如果只有一个接口调用,立刻调用
            long userinfoTime = System.currentTimeMillis();
            String response = restTemplate.getForObject(url,String.class);
            JSONObject value = JSONObject.parSEObject(response);
            System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
            return value;
        } else { // 如果是多个接口调用,拆分成子任务  7,8,9,10
            System.out.println(Thread.currentThread() + "任务拆分一次");
            //求中间值。
            int x = (start + end) / 2;
            //任务从开始,到中间值。
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate,start,x);// 负责处理哪一部分?
            //fork拆分任务。
            httpJsonRequest.fork();
            //任务从中间值+1 ,到结束。
            HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate,x + 1,end);// 负责处理哪一部分?
            httpJsonRequest1.fork();

            // join获取处理结果
            JSONObject result = new JSONObject();
            
            //join合并结果。
            result.putAll(httpJsonRequest.join());
            result.putAll(httpJsonRequest1.join());
            
            return result;
        }
    }
}

就是把任务拆分,交给线程池执行,再合并。与Future的获取返回值有点相似。只是对任务拆分做了抽象封装。

?

特点:

线程池 ThreadPoolExecutor?中只维护了一个队列。多线程去队列中争抢任务来执行。

?

而ForkJoinPool 是每一个大任务是维护一个队列,fork拆分出的小任务也是在自己队列中。一个线程去处理自己队列中的任务,此时,没有线程争抢,效率比线程池要高。

该线程把当前自己的队列处理完了,就去和其他线程争抢其他队列的任务来处理,这个术语叫工作窃取。

?

ForkJoinPool 维护了多个队列,ThreadPoolExecutor只维护了一个队列,通过多个队列来减少线程争抢,从而提高了效率。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读