Java8 CompletableFuture异步编程-入门篇

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

前言

1、Future vs CompletableFuture

1.1 准备工作

1.2 Future 的局限性

PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可

1.3 CompletableFuture 的优势

2、创建异步任务

2.1 runAsync

2.2 supplyAsync

2.3 异步任务中的线程池

2.4 异步编程思想

3、异步任务回调

3.1 thenApply

3.2 thenAccept

3.3 thenRun

3.4 更进一步提升并行化

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

4.2 编排2个非依赖关系的异步任务 thenCombine()

4.3 合并多个异步任务 allOf / anyOf

5、异步任务的异常处理

5.1 exceptionally()

5.2 handle()

总结


前言

JDK 5引入了Future模式。Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算。

Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

1、Future vs CompletableFuture

1.1 准备工作

为了便于后续更好地调试,我们需要定义一个工具类辅助我们对知识的理解。

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
​
public class CommonUtils {
​
    public static String readFile(String pathToFile) {
        try {
            return Files.readString(Paths.get(pathToFile));
        } catch(Exception e) {
            e.printStackTrace();
            return "";
        }
    }
​
    //当前线程休眠-毫秒
    public static void sleepMillis(long millis) {
        try {
            TimeUnit.MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
​
    //当前线程休眠-秒
    public static void sleepSecond(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
​
   //打印日志
    public static void printThreadLog(String message) {
      //时间戳|线程id|线程名|日志信息
        String result = new StringJoiner(" | ")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.format("%2d",Thread.currentThread().getId()))
                .add(String.valueOf(Thread.currentThread().getName()))
                .add(message)
                .toString();
        System.out.println(result);
    }
}
​

1.2 Future 的局限性

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中 news.txt

oh my god!completablefuture真tmd好用呀
PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可
public class FutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
​
        ExecutorService executor = Executors.newFixedThreadPool(5);
        // step 1: 读取敏感词汇
        Future<String[]> filterWordFuture = executor.submit(() -> {
            String str = CommonUtils.readFile("filter_words.txt");
            String[] filterWords = str.split(",");
            return filterWords;
        });
​
        // step 2: 读取新闻稿文件内容
        Future<String> newsFuture = executor.submit(() -> {
            return CommonUtils.readFile("news.txt");
        });
​
        // step 3: 替换操作(当敏感词汇很多,文件很多,替换也会是个耗时的任务)
        Future<String> replaceFuture = executor.submit(() -> {
            String[] words = filterWordFuture.get();
            String news = newsFuture.get();
​
            // 替换敏感词汇
            for (String word : words) {
                if (news.indexOf(word) >= 0) {
                    news = news.replace(word, "**");
                }
            }
            return news;
        });
​
        String filteredNews = replaceFuture.get();
        System.out.println("过滤后的新闻稿:" + filteredNews);
​
        executor.shutdown();
    }
}
​

通过上面的代码,我们会发现,Future相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足,至少表现如下:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。 它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力。
  • 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给replaceFuture,需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
  • 不能将多个Future合并在一起。假设你有多种不同的Future,你想在它们全部并行完成后然后再运行某个函数,Future很难独立完成这一需要。
  • 没有异常处理。Future提供的方法中没有专门的API应对异常处理,还是需要开发者自己手动异常处理。

1.3 CompletableFuture 的优势

CompletableFuture 实现了FutureCompletionStage接口

CompletableFuture 相对于 Future 具有以下优势:

  • 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 无缝衔接和亲和 lambda 表达式 和 Stream - API 。
  • 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

2、创建异步任务

2.1 runAsync

如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()方法。它接受一个Runnable接口的实现类对象,方法返回CompletableFuture<Void> 对象

static CompletableFuture<Void> runAsync(Runnable runnable);

演示案例:开启一个不从任务中返回任何内容的CompletableFuture异步任务

public class RunAsyncDemo {
    public static void main(String[] args) {
        // runAsync 创建异步任务
        CommonUtils.printThreadLog("main start");
        // 使用Runnable匿名内部类
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                CommonUtils.printThreadLog("读取文件开始");
                // 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)
                CommonUtils.sleepSecond(3);
                CommonUtils.printThreadLog("读取文件结束");
            }
        });
​
        CommonUtils.printThreadLog("here are not blocked,main continue");
        CommonUtils.sleepSecond(4); //  此处休眠为的是等待CompletableFuture背后的线程池执行完成。
        CommonUtils.printThreadLog("main end");
    }
}

我们也可以以Lambda表达式的形式传递Runnable接口实现类对象

public class RunAsyncDemo2 {
    public static void main(String[] args) {
        // runAsync 创建异步任务
        CommonUtils.printThreadLog("main start");
        // 使用Lambda表达式
        CompletableFuture.runAsync(() -> {
            CommonUtils.printThreadLog("读取文件开始");
            CommonUtils.sleepSecond(3);
            CommonUtils.printThreadLog("读取文件结束");
        });
​
        CommonUtils.printThreadLog("here are not blocked,main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}
​

需求:使用CompletableFuture开启异步任务读取 news.txt 文件中的新闻稿,并打印输出。

public class RunAsyncDemo3 {
    public static void main(String[] args) {
        // 需求:使用多线程异步读取 words.txt 中的敏感词汇,并打印输出。
        CommonUtils.printThreadLog("main start");
​
        CompletableFuture.runAsync(()->{
            String news = CommonUtils.readFile("news.txt");
            CommonUtils.printThreadLog(news);
        });
​
        CommonUtils.printThreadLog("here are not blocked,main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}

在后续的章节中,我们会经常使用Lambda表达式。

2.2 supplyAsync

CompletableFuture.runAsync() 开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时,CompletableFuture.supplyAsync()是你最好的选择了。

static CompletableFuture<U> supplyAsync(Supplier<U> supplier)

它入参一个 Supplier 供给者,用于供给带返回值的异步任务 并返回CompletableFuture<U>,其中U是供给者给程序供给值的类型。

需求:开启异步任务读取 news.txt 文件中的新闻稿,返回文件中内容并在主线程打印输出

public class SupplyAsyncDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CommonUtils.printThreadLog("main start");
​
        CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                String news = CommonUtils.readFile("news.txt");
                return news;
            }
        });
​
        CommonUtils.printThreadLog("here are not blocked,main continue");
        // 阻塞并等待newsFuture完成
        String news = newsFuture.get();
        CommonUtils.printThreadLog("news = " + news);
        CommonUtils.printThreadLog("main end");
    }
}

如果想要获取newsFuture结果,可以调用completableFuture.get()方法,get()方法将阻塞,直到newsFuture完成。

我们依然可以使用Java 8的Lambda表达式使上面的代码更简洁。

CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
    String news = CommonUtils.readFile("news.txt");
    return news;
});

2.3 异步任务中的线程池

大家已经知道,runAsync()和supplyAsync()方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗? 不是吗!

CompletableFuture 会从全局的ForkJoinPool.commonPool() 线程池获取线程来执行这些任务

当然,你也可以创建一个线程池,并将其传递给runAsync()和supplyAsync()方法,以使它们在从您指定的线程池获得的线程中执行任务。

CompletableFuture API中的所有方法都有两种变体,一种是接受传入的Executor参数作为指定的线程池,而另一种则使用默认的线程池 (ForkJoinPool.commonPool()) 。

// runAsync() 的重载方法 
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync() 的重载方法 
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

需求:指定线程池,开启异步任务读取 news.txt 中的新闻稿,返回文件中内容并在主线程打印输出

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
    CommonUtils.printThreadLog("异步读取文件开始");
    String news = CommonUtils.readFile("news.txt");
    CommonUtils.printThreadLog("异步读取文件完成");
    return news;
},executor);

最佳实践:创建属于自己的业务线程池

如果所有CompletableFuture共享一个线程池,那么一旦有异步任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

2.4 异步编程思想

综合上述,看到了吧,我们没有显式地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识吧,以上专业的说法是:线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员,专人专事

异步编程是可以让程序并行( 也可能是并发 )运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。

作为开发者,只需要有一个意识:

开发者只需要把耗时的操作交给CompletableFuture开启一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为异步编程思想

3、异步任务回调

CompletableFuture.get()方法是阻塞的。调用时它会阻塞等待 直到这个Future完成,并在完成后返回结果。 但是,很多时候这不是我们想要的。

对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。 这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。 您可以使用thenApply(),thenAccept()和thenRun()方法,它们可以把回调函数附加到CompletableFuture

3.1 thenApply

使用 thenApply() 方法可以处理和转换CompletableFuture的结果。 它以Function<T,R>作为参数。 Function<T,R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果

CompletableFuture<R> thenApply(Function<T,R> fn)

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,把内容转换成数组( 敏感词数组 ),异步任务返回敏感词数组

public class ThenApplyDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        CommonUtils.printThreadLog("main start");

        CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取filter_words文件");
            String filterWordsContent = CommonUtils.readFile("filter_words.txt");
            return filterWordsContent;
        });

        CompletableFuture<String[]> filterWordsFuture = readFileFuture.thenApply((content) -> {
            CommonUtils.printThreadLog("文件内容转换成敏感词数组");
            String[] filterWords = content.split(",");
            return filterWords;
        });

        CommonUtils.printThreadLog("main continue");
        
        String[] filterWords = filterWordsFuture.get();
        CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
        CommonUtils.printThreadLog("main end");
    }
}

你还可以通过附加一系列thenApply()回调方法,在CompletableFuture上编写一系列转换序列。一个thenApply()方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递。

CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {
    CommonUtils.printThreadLog("读取filter_words文件");
    String filterWordsContent = CommonUtils.readFile("filter_words.txt");
    return filterWordsContent;
}).thenApply((content) -> {
    CommonUtils.printThreadLog("转换成敏感词数组");
    String[] filterWords = content.split(",");
    return filterWords;
});

3.2 thenAccept

如果你不想从回调函数返回结果,而只想在Future完成后运行一些代码,则可以使用thenAccept()

这些方法是入参一个 Consumer,它可以对异步任务的执行结果进行消费使用,方法返回CompletableFuture。

CompletableFuture<Void>	thenAccept(Consumer<T> action)

通常用作回调链中的最后一个回调。

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,然后打印敏感词数组

public class ThenAcceptDemo {
    public static void main(String[] args) {
        CommonUtils.printThreadLog("main start");

        CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取filter_words文件");
            String filterWordsContent = CommonUtils.readFile("filter_words.txt");
            return filterWordsContent;
        }).thenApply((content) -> {
            CommonUtils.printThreadLog("转换成敏感词数组");
            String[] filterWords = content.split(",");
            return filterWords;
        }).thenAccept((filterWords) -> {
            CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
        });

        CommonUtils.printThreadLog("main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}

3.3 thenRun

前面我们已经知道,通过thenApply( Function<T,R> ) 对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果;

通过thenAccept( Consumer ) 对链式操作中上一个异步任务的结果进行消费使用,不返回新结果;

如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么 CompletableFuture.thenRun() 会是你最佳的选择,它需要一个Runnable并返回CompletableFuture<Void>。

CompletableFuture<Void> thenRun(Runnable action);

演示案例:我们仅仅想知道 filter_words.txt 的文件是否读取完成

public class ThenRunDemo {
    public static void main(String[] args) {
        CommonUtils.printThreadLog("main start");

        CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取filter_words文件");
            String filterWordsContent = CommonUtils.readFile("filter_words.txt");
            return filterWordsContent;
        }).thenRun(() -> {
            CommonUtils.printThreadLog("读取filter_words文件读取完成");
        });

        CommonUtils.printThreadLog("main continue");
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}

3.4 更进一步提升并行化

CompletableFuture 提供的所有回调方法都有两个异步变体

CompletableFuture<U> thenApply(Function<T,U> fn)
// 回调方法的异步变体(异步回调)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor)

注意:这些带了Async的异步回调 通过在单独的线程中执行回调任务 来帮助您进一步促进并行化计算。

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

public class ThenApplyAsyncDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CommonUtils.printThreadLog("main start");

        CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {
            /*
            CommonUtils.printThreadLog("读取filter_words文件");
            String filterWordsContent = CommonUtils.readFile("filter_words.txt");
            return filterWordsContent;
            */

            // 此时,立即返回结果
            return "尼玛, NB, tmd";
        }).thenApply((content) -> {
            /**
             * 一般而言,thenApply任务的执行和supplyAsync()任务执行可以使用同一线程执行
             * 如果supplyAsync()任务立即返回结果,则thenApply的任务在主线程中执行
             */
            CommonUtils.printThreadLog("把内容转换成敏感词数组");
            String[] filterWords = content.split(",");
            return filterWords;
        });

        CommonUtils.printThreadLog("main continue");

        String[] filterWords = filterWordFuture.get();
        CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
        CommonUtils.printThreadLog("main end");
    }
}

要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool() 获得的另一个线程中执行

public class ThenApplyAsyncDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CommonUtils.printThreadLog("main start");

        CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取filter_words文件");
            String filterWordsContent = CommonUtils.readFile("filter_words.txt");
            return filterWordsContent;
        }).thenApplyAsync((content) -> {
            CommonUtils.printThreadLog("把内容转换成敏感词数组");
            String[] filterWords = content.split(",");
            return filterWords;
        });

        CommonUtils.printThreadLog("main continue");

        String[] filterWords = filterWordFuture.get();
        CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));
        CommonUtils.printThreadLog("main end");
    }
}

以上程序一种可能的运行结果(需要多运行几次):

1672885914481 |  1 | main | main start
1672885914511 | 16 | ForkJoinPool.commonPool-worker-1 | 读取filter_words.txt文件
1672885914511 |  1 | main | main continue
1672885914521 | 17 | ForkJoinPool.commonPool-worker-2 | 把内容转换成敏感词数组
1672885914521 |  1 | main | filterWords = [尼玛, NB, tmd]
1672885914521 |  1 | main | main end

此外,如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Executor的线程池中获取的线程中执行;

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {
    CommonUtils.printThreadLog("读取filter_words文件");
    String filterWordsContent = CommonUtils.readFile("filter_words.txt");
    return filterWordsContent;
}).thenApplyAsync((content) -> {
    CommonUtils.printThreadLog("把内容转换成敏感词数组");
    String[] filterWords = content.split(",");
    return filterWords;
},executor);
executor.shutdown();

其他两个回调的变体版本如下:

// thenAccept和其异步回调
CompletableFuture<Void>	thenAccept(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action, Executor executor)

// thenRun和其异步回调
CompletableFuture<Void>	thenRun(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action, Executor executor)

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组让主线程待用。

关于读取和解析内容,假设使用以下的 readFileFuture(String) 和 splitFuture(String) 方法完成。

public static CompletableFuture<String> readFileFuture(String fileName) {
    return CompletableFuture.supplyAsync(() -> {
        String filterWordsContent = CommonUtils.readFile(fileName);
        return filterWordsContent;
    });
}

public static CompletableFuture<String[]> splitFuture(String context) {
    return CompletableFuture.supplyAsync(() -> {
        String[] filterWords = context.split(",");
        return filterWords;
    });
}

现在,让我们先了解如果使用thenApply() 结果会发生什么

CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("filter_words.txt")
    .thenApply((context) -> {
       return splitFuture(context);
    });

回顾在之前的案例中,thenApply(Function<T,R>) 中Function回调会对上一步任务结果转换后得到一个简单值 ,但现在这种情况下,最终结果是嵌套的CompletableFuture,所以这是不符合预期的,那怎么办呢?

我们想要的是:把上一步异步任务的结果,转成一个CompletableFuture对象,这个CompletableFuture对象中包含本次异步任务处理后的结果。也就是说,我们想组合上一步异步任务的结果到下一个新的异步任务中, 结果由这个新的异步任务返回

此时,你需要使用thenCompose()方法代替,我们可以把它理解为 异步任务的组合

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> func)

所以,thenCompose()用来连接两个有依赖关系的异步任务,结果由第二个任务返回

CompletableFuture<String[]> future = readFileFuture("filter_words.txt")
    .thenCompose((context) -> { 
        return splitFuture(context);
    });

因此,这里积累了一个经验:

如果我们想连接( 编排 ) 两个依赖关系的异步任务( CompletableFuture 对象 ) ,请使用 thenCompose() 方法

当然,thenCompose 也存在异步回调变体版本:

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> fn)
    
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn, Executor executor)

4.2 编排2个非依赖关系的异步任务 thenCombine()

我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine();

// T是第一个任务的结果 U是第二个任务的结果 V是经BiFunction应用转换后的结果
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

public class ThenCombineDemo {
    public static void main(String[] args) throws Exception {
        // 读取敏感词汇的文件并解析到数组中
        CompletableFuture<String[]> future1 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取敏感词汇并解析");
            String context = CommonUtils.readFile("filter_words.txt");
            String[] words = context.split(",");
            return words;
        });

        // 读取news文件内容
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            CommonUtils.printThreadLog("读取news文件内容");
            String context = CommonUtils.readFile("news.txt");
            return context;
        });

        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (words, context) -> {
            // 替换操作
            CommonUtils.printThreadLog("替换操作");
            for (String word : words) {
                if(context.indexOf(word) > -1) {
                    context = context.replace(word, "**");
                }
            }
            return context;
        });

        String filteredContext = combinedFuture.get();
        System.out.println("filteredContext = " + filteredContext);
    }
}

注意:当两个Future都完成时,才将两个异步任务的结果传递给thenCombine()的回调函数做进一步处理。

和以往一样,thenCombine 也存在异步回调变体版本

CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func,Executor executor)

4.3 合并多个异步任务 allOf / anyOf

我们使用thenCompose()和thenCombine()将两个CompletableFuture组合和合并在一起。

如果要编排任意数量的CompletableFuture怎么办?可以使用以下方法来组合任意数量的CompletableFuture

public static CompletableFuture<Void>	allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf()用于以下情形中:有多个需要独立并行运行的Future,并在所有这些Future 都完成后执行一些操作。

需求:统计news1.txt、new2.txt、new3.txt 文件中包含CompletableFuture关键字的文件的个数

public class AllOfDemo {

    public static CompletableFuture<String> readFileFuture(String fileName) {
        return CompletableFuture.supplyAsync(() -> {
            String content = CommonUtils.readFile(fileName);
            return content;
        });
    }

    public static void main(String[] args) {
        // step 1: 创建List集合存储文件名
        List<String> fileList = Arrays.asList("news1.txt", "news2.txt", "news3.txt");

        // step 2: 根据文件名调用readFileFuture创建多个CompletableFuture,并存入List集合中
        List<CompletableFuture<String>> readFileFutureList = fileList.stream().map(fileName -> {
            return readFileFuture(fileName);
        }).collect(Collectors.toList());

        // step 3: 把List集合转换成数组待用,以便传入allOf方法中
        int len = readFileFutureList.size();
        CompletableFuture[] readFileFutureArr = readFileFutureList.toArray(new CompletableFuture[len]);

        // step 4: 使用allOf方法合并多个异步任务
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileFutureArr);

        // step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数
        CompletableFuture<Long> countFuture = allOfFuture.thenApply(v -> {
            return readFileFutureList.stream()
                    .map(future -> future.join())
                    .filter(content -> content.contains("CompletableFuture"))
                    .count();
        });

        // step 6: 主线程打印输出文件个数
        Long count = countFuture.join();
        System.out.println("count = " + count);
    }
}

顾名思义,当给定的多个异步任务中的有任意Future一个完成时,需要执行一些操作,可以使用 anyOf 方法

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

anyOf()返回一个新的CompletableFuture,新的CompletableFuture的结果和 cfs中已完成的那个异步任务结果相同。

演示案例:anyOf 执行过程

public class AnyOfDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            Tools.sleepMillis(2);
            return "Future1的结果";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            Tools.sleepMillis(1);
            return "Future2的结果";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            Tools.sleepMillis(3);
            return "Future3的结果";
        });

        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

        // 输出Future2的结果
        System.out.println(anyOfFuture.get());
    }
}

在上面的示例中,当三个CompletableFuture中的任意一个完成时,anyOfFuture就完成了。 由于future2的睡眠时间最少,因此它将首先完成,最终结果将是"Future2的结果"。

注意:

  • anyOf() 方法返回类型必须是 CompletableFuture <Object>。
  • anyOf()的问题在于,如果您拥有返回不同类型结果的CompletableFuture,那么您将不知道最终CompletableFuture的类型。

5、异步任务的异常处理

在前面的章节中,我们并没有更多地关心异常处理的问题,其实,CompletableFuture 提供了优化处理异常的方式。

首先,让我们了解异常如何在回调链中传播

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {
            int r = 1 / 0;
            return "result1";
        }).thenApply(result -> {
            return result + " result2";
        }).thenApply(result -> {
            return result + " result3";
        }).thenAccept((result)->{
            System.out.println(result);
        });
    }

如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将转入异常处理

如果在第一个 thenApply 任务中出现异常,第二个 thenApply 和 最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。

5.1 exceptionally()

exceptionally 用于处理回调链上的异常,回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。

// Throwable表示具体的异常对象e
CompletableFuture<R> exceptionally(Function<Throwable, R> func)
public class ExceptionallyDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            int r = 1 / 0;
            return "result1";
        }).thenApply(result -> {
            String str = null;
            int len = str.length();
            return result + " result2";
        }).thenApply(result -> {
            return result + " result3";
        }).exceptionally(ex -> {
            System.out.println("出现异常:" + ex.getMessage());
            return "Unknown";
        });

        String ret = future.get();
        Tools.printThreadLog("最终结果:" + ret);
    }
}

 因为exceptionally只处理一次异常,所以常常用在回调链的末端。

5.2 handle()

CompletableFuture API 还提供了一种更通用的方法 handle() 表示从异常中恢复

handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可以进一步向下传递。

CompletableFuture<R> handle(BiFunction<T, Throwable, R> fn)
public class HandleDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            int r = 1 / 0;
            return "result";
        }).handle((ret, ex) -> {
            if(ex != null) {
                System.out.println("我们得到异常:" + ex.getMessage());
                return "Unknown!";
            }
            return ret;
        });

        String ret = future.get();
        CommonUtils.printThreadLog(ret);
    }
}

 如果发生异常,则res参数将为null,否则ex参数将为null。

需求:对回调链中的一次异常进行恢复处理

public class HandleExceptionDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            int r = 1 / 0;
            return "result1";
        }).handle((ret, ex) -> {
            if (ex != null) {
                System.out.println("我们得到异常:" + ex.getMessage());
                return "Unknown1";
            }
            return ret;
        }).thenApply(result -> {
            String str = null;
            int len = str.length();
            return result + " result2";
        }).handle((ret, ex) -> {
            if (ex != null) {
                System.out.println("我们得到异常:" + ex.getMessage());
                return "Unknown2";
            }
            return ret;
        }).thenApply(result -> {
            return result + " result3";
        });

        String ret = future.get();
        Tools.printThreadLog("最终结果:" + ret);
    }
}

和以往一样,为了提升并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本

CompletableFuture<R> exceptionally(Function<Throwable, R> fn)
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn)  // jdk17+
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn,Executor executor) // jdk17+

CompletableFuture<R> handle(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn, Executor executor)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/436264.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

01_Maven

文章目录 Maven安装MavenMaven的工作流程配置MavenMaven的使用module和project的关系如何用Maven导包 如何用Maven进行项目构建指令介绍clean指令compile指令package指令install指令 Maven的依赖管理如何导包scope作用域依赖传递依赖冲突 使用Maven开发项目Junit如何使用Junit …

xss.pwnfunction.com靶机 Warmups

通关要求弹出警告框alert(1337) 没有用户交互 不能使用外链接 在chrome中测试 Ma Spaghet! 通过分析代码我们可以看到它直接用innerHTML将接收的内容赋值 但是我们不能使用<script>标签因为&#xff1a;HTML 5 中指定不执行由 innerHTML 插入的 <script> 标签。 所…

字符串标记高亮脚本

源码 #!/bin/bash # usage: # echo hhh|mark str [font_color] [background_color] # font_color and background_color is optional, default is black&whiterp_str$1 f_color30 b_color47if [ "${f_color}a" "a" ]; thenf_color30 fiif [ "${…

985硕的4家大厂实习与校招经历专题分享(part1)

先简单介绍一下我的个人经历&#xff1a; 985硕士24届毕业生&#xff0c;实验室方向:CV深度学习 就业&#xff1a;工程-java后端 关注大模型相关技术发展 校招offer: 阿里巴巴 字节跳动 等10 研究生期间独立发了一篇二区SCI 实习经历:字节 阿里 京东 B站 &#xff08;只看大厂…

Stable Diffusion 解析:探寻 AI 绘画背后的科技神秘

AI 绘画发展史 在谈论 Stable Diffusion 之前&#xff0c;有必要先了解 AI 绘画的发展历程。 早在 2012 年&#xff0c;华人科学家吴恩达领导的团队训练出了当时世界上最大的深度学习网络。这个网络能够自主学习识别猫等物体&#xff0c;并在短短三天时间内绘制出了一张模糊但…

(黑马出品_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

&#xff08;黑马出品_04&#xff09;SpringCloudRabbitMQDockerRedis搜索分布式 微服务技术异步通信 今日目标1.初识MQ1.1.同步和异步通讯1.1.1.同步通讯1.1.2.异步通讯 1.2.技术对比 2.快速入门2.1.安装RabbitMQ2.1.1.单机部署(1).下载镜像方式…

Amazon Bedrock 上的新一代 Anthropic 模型 Claude 3

如您所知&#xff0c;Amazon Bedrock 是利用基础模型 (FM) 构建生成式 AI 解决方案的最简单的途径&#xff0c;其中包括使用 Anthropic 的先进模型 Claude。而如今&#xff0c;新一代 Claude 模型已经到来。到目前为止&#xff0c;我已经制作了 3 个单独视频来介绍 Claude 3 的…

图论练习5

Going Home Here 解题思路 模板 二分图最优匹配&#xff0c;前提是有完美匹配&#xff08;即存在一一配对&#xff09;左右集合分别有顶标&#xff0c;当时&#xff0c;为有效边&#xff0c;即选中初始对于左集合每个点&#xff0c;选择其连边中最优的&#xff0c;然后对于每…

Unity 给刚体一个力或速度

创建平面和小球&#xff0c;给力或给速度让其弹起 给小球挂载刚体&#xff08;Rigibdody&#xff09;和脚本 &#xff08;力是累计或者衰减的&#xff0c;直接给速度就是赋值&#xff0c;但如果速度就和力类似了&#xff09; using System.Collections; using System.Collect…

开发手札:unity2022+vscode1.87联合开发

不得不说&#xff0c;时间的力量是很强大的&#xff0c;同时熵增理论适用于任何地方。 在现在的公司干了五年多了&#xff0c;五年前配置的内网开发机&#xff0c;i7 870016g1t hddgtx1080已经卡爆了&#xff0c;特别是硬盘掉速严重&#xff0c;开机开软件没有一两分钟都…

代码随想录算法训练营第四十四天|309.最佳买卖股票时机含冷冻期,714.买卖股票的最佳时机含手续费,总结

系列文章目录 代码随想录算法训练营第一天|数组理论基础&#xff0c;704. 二分查找&#xff0c;27. 移除元素 代码随想录算法训练营第二天|977.有序数组的平方 &#xff0c;209.长度最小的子数组 &#xff0c;59.螺旋矩阵II 代码随想录算法训练营第三天|链表理论基础&#xff…

CVPR 2024 | Modular Blind Video Quality Assessment:模块化无参视频质量评估

无参视频质量评估 (Blind Video Quality Assessment&#xff0c;BVQA) 在评估和改善各种视频平台并服务用户的观看体验方面发挥着关键作用。当前基于深度学习的模型主要以下采样/局部块采样的形式分析视频内容&#xff0c;而忽视了实际空域分辨率和时域帧率对视频质量的影响&am…

前端处理接口直接返回的图片

有时候接口会直接返回图片而不是连接&#xff0c;前端需要处理后才能使用。 首先你可能需要设置responseType: blob’处理响应数据格式。 直接使用 将接口及参数动态拼接成img.src直接使用 <img src"http://test.com/api/img?size50x50" alt"">i…

【Java设计模式】十一、组合模式

文章目录 1、组合模式2、案例3、总结 1、组合模式 下面的文件系统&#xff0c;树形结构&#xff0c;有文件夹节点和文件节点&#xff08;有无儿子节点的区别&#xff09;&#xff0c;使用这两种节点也要做区分。 组合模式&#xff08;部分整体模式&#xff09;&#xff0c;就…

前端运算符比较与计算中的类型转换,运算规则

题目&#xff1a; 下面表达式的值分别都是什么&#xff08;类型转换&#xff09; 0 0 0 2 true 2 false false false false 0 false undefined false null null undefined\t\r\n 0JS中的原始类型有哪些 原始值类型就是 存储的都是值&#xff0c;没有函数可以调用的。…

解决ts报错:类型“entry”上不存在属性“$AppTools”

uniapp ts 项目&#xff0c;已经将AppTools挂在了vue的原型上&#xff0c;但是在vue页面使用时报错&#xff0c;如图&#xff1a; 解决&#xff1a; 在项目根目录下的tsconfig.json文件添加如下配置&#xff1a; "include": ["src/**/*"],这样报错就消失…

32单片机基础:输入捕获测频率

接线图如下图所示&#xff1a; 我们复制之前写过的代码6-3 PWM驱动LED呼吸灯 在PWM模块中&#xff0c;执行的逻辑是&#xff0c;初始化TIM2的通道1&#xff0c;产生一个PWM波形&#xff0c;输出引脚是PA0&#xff0c;通过SetCompare1的函数&#xff0c;可以调节CCR1寄存器的值…

算法相关计算

1 内存管理相关 1 .1 float 6.9 f 的内存计算方法 二进制小数的计算&#xff1a; &#xff08;1&#xff09;小数的二进制算法和整数的大致相反&#xff0c;就是不断的拿小数部分乘以2取积的整数部分&#xff0c;然后正序排列。比如求0.9的二进制&#xff1a; 0.9*21.8 取 1…

【网络】主机连接 TCP 三次握手

【网络】主机连接 TCP 三次握手 一、TCP连接3次握手二、TCP连接4次挥手三、为什么tcp要三次握手&#xff0c;两次行不四、为什么TCP挥手需要4次五、Netstat命令的连接状态包括:六、练习题 一、TCP连接3次握手 1、建立连接的时候是3次握手&#xff0c;客户端向服务器端发送SYN&…

微软亚太区AI智能应用创新业务负责人许豪,将出席“ISIG-AIGC技术与应用发展峰会”

3月16日&#xff0c;第四届「ISIG中国产业智能大会」将在上海中庚聚龙酒店拉开序幕。本届大会由苏州市金融科技协会指导&#xff0c;企智未来科技&#xff08;AIGC开放社区、RPA中国、LowCode低码时代&#xff09;主办。大会旨在聚合每一位产业成员的力量&#xff0c;深入探索A…
最新文章