剑指JUC原理-17.CompletableFuture

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

    • Future和Callable接口
      • FutureTask实现类
      • Future到CompletableFuture
        • Future优点
        • Future缺点
          • get()阻塞
          • isDone() 轮循
    • CompletableFuture基本介绍
      • CompletableFuture
      • CompletionStage
      • 核心的四个静态方法(分为两组)
        • runAsync无返回值
          • runAsync
          • runAsync+线程池
        • supplyAsync有返回值
          • supplyAsync
          • supplyAsync+线程池
      • CompletableFuture使用演示(日常使用)
        • 基本功能
        • 减少阻塞和轮询whenComplete
      • CompletableFuture优点总结
    • CompletableFuture案例精讲
      • 编程必备技能准备
        • 函数式接口
        • 链式调用|链式编程|链式写法
        • join和get对比
      • 实战精讲-比价网站案例
        • 需求
        • 基本框架搭建
        • 从功能到性能:利用CompletableFuture
    • CompletableFuture常用API
      • 获得结果和触发计算
      • 对计算结果进行处理
      • 对计算结果进行消费
      • CompleteFuture和线程池说明(非常重要)
      • 对计算速度进行选用
      • 对计算结果进行合并

Future和Callable接口

  • Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)

  • 一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

在这里插入图片描述

  • 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。

  • 有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

FutureTask实现类

  • 在源码可以看到,他既继承了RunnableFuture接口,也在构造方法中实现了Callable接口(有返回值、可抛出异常)和Runnable接口

在这里插入图片描述

	public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

   
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

完成上面目的的代码 - 多线程/有返回/异步

一个主线程,一个mythread|步执行了|返回了"hello callable"

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        System.out.println(futureTask.get());//接收返回值
    }
}

class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ----异步执行");
        return "hello Callable 返回值";
    }
}
//结果
//-----come in call() ----异步执行
//hello Callable 返回值

Future到CompletableFuture

Future优点
  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。

  • 方案一,3个任务1个main线程处理,大概1130ms

在这里插入图片描述

  • 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概530毫秒

在这里插入图片描述

Future缺点
get()阻塞

一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
        System.out.println(futureTask.get());
        //----------------------------------------------------------注意顺序
    }
}
//main  -------主线程忙其他任务了
//t1   ------副线程come in

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
        //-----------------------------------------------------------注意顺序
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");

        //----------------------------------------------------------注意顺序
    }
}
//t1   ------副线程come in
//-------------------5秒后才出现下面的结果-------------说明一旦调用get()方法直接去找副线程了,阻塞了主线程
//task over
//main  -------主线程忙其他任务了
isDone() 轮循

利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
            try {
                TimeUnit.SECONDS.sleep(5);//暂停几秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();

        System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
      //1-------  System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等3秒,过了3秒直接抛出异常

        //2-------更健壮的方式-------轮询方法---等副线程拿到才去get()
        //但是也会消耗cpu资源
        while(true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else{
                //暂停毫秒
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("正在处理中------------正在处理中");
            }
        }
    }
}
//main  -------主线程忙其他任务了
//t1   ------副线程come in
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//task over

CompletableFuture基本介绍

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture

CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

在这里插入图片描述

在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Completable Future的方法。

它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

它实现了FutureCompletion Stage接口

CompletionStage

Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段

核心的四个静态方法(分为两组)

  • 关键就是 |有没有返回值|是否用了线程池|

  • 参数说明:

    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。
runAsync无返回值
runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(voidCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9 //默认的线程池
//null --- 没有返回值
runAsync+线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {

            System.out.println(Thread.currentThread().getName());
            //停顿几秒线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },executorService);
        System.out.println(voidCompletableFuture.get());
    }
}
//pool-1-thread-1   ----指定的线程池
//null ----没有返回值
supplyAsync有返回值
supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        });
        System.out.println(objectCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了
supplyAsync+线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

        CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "helllo supplyasync";
        },executorService);
        System.out.println(objectCompletableFuture.get());
    }
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了

CompletableFuture使用演示(日常使用)

基本功能

CompletableFuture可以完成Future的功能

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"----副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1秒钟后出结果"+result);
            return result;
        });

        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        System.out.println(objectCompletableFuture.get());
    }
}
//main线程先去忙其他任务
//ForkJoinPool.commonPool-worker-9----副线程come in
//1秒钟后出结果6
//6
减少阻塞和轮询whenComplete

CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        }).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            // 自动回调,不需要get
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue3

假如换用自定义线程池

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//pool-1-thread-1--------副线程come in
//main线程先去忙其他任务
//------------------计算完成,更新系统updataValue6

异常情况的展示,设置一个异常 int i = 10 / 0 ;

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"--------副线程come in");
            int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----结果---异常判断值---"+result);
            //---------------------异常情况的演示--------------------------------------
            if(result > 2){
                int i  = 10 / 0 ;//我们主动的给一个异常情况
            }
            //------------------------------------------------------------------
            return result;
        },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
            if(e == null){
                System.out.println("------------------计算完成,更新系统updataValue"+v);
            }
        }).exceptionally(e->{//有异常的情况
            e.printStackTrace();
            System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//pool-1-thread-1--------副线程come in
//main线程先去忙其他任务
//-----结果---异常判断值---4                (这里 4 >2了,直接抛出异常)
//异常情况java.lang.ArithmeticException: / by zero  java.lang.ArithmeticException: / by zero
//java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
//  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
//  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
//  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
//  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
//  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
//  at java.lang.Thread.run(Thread.java:748)
//Caused by: java.lang.ArithmeticException: / by zero
//  at com.zhang.admin.controller.CompletableFutureUseDemo.lambda$main$0(CompletableFutureUseDemo.java:19)
//  at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
//  ... 3 more

CompletableFuture优点总结

  • 异步任务结束时,会自动回调某个对象的方法;

  • 异步任务出错时,会自动回调某个对象的方法。

CompletableFuture案例精讲

编程必备技能准备

函数式接口

函数式接口的定义:

  • 任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。
public interface Runnable{
  public abstract void run();
}

常见的函数式接口

  • Runnable
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
  • Function
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}
  • Consumer
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}
  • Supplier
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}
  • Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)
@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);

}
函数式接口名称方法名称参数返回值
Runnablerun无参数无返回值
Functionapply1个参数有返回值
Consumeaccept1个参数无返回值
Supplierget没有参数有返回值
Biconsumeraccept2个参数无返回值
链式调用|链式编程|链式写法
public class Chain {
    public static void main(String[] args) {
        //-------------------老式写法------------
//        Student student = new Student();
//        student.setId(1);
//        student.setMajor("cs");
//        student.setName("小卡");
        new Student().setId(1).setName("大卡").setMajor("cs");
    }
    
}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{
    private int id;
    private String name;
    private String major;
}
join和get对比
  • 功能几乎一样,区别在于编码时是否需要抛出异常
    • get()方法需要抛出异常
    • join()方法不需要抛出异常
public class Chain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.get());
    }

}

public class Chain {
    public static void main(String[] args)  {//抛出异常
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 12345";
        });
        System.out.println(completableFuture.join());
    }
}

实战精讲-比价网站案例

需求
1需求说明
1.1同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少

2输出返回:
出来结果希望是同款产品的在不同地方的价格清单列表, 返回一个List<String>
《mysql》in jd price is 88.05
《mysql》in dang dang price is 86.11
《mysql》in tao bao price is 90.43

3解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表
1   stepbystep   , 按部就班, 查完京东查淘宝, 查完淘宝查天猫......
2   all in       ,万箭齐发,一口气多线程异步任务同时查询。。。
基本框架搭建
  • 相当于是一个一个按部就班
public class Case {
    static List<NetMall> list = Arrays.asList(
      new NetMall("jd"),
      new NetMall("dangdang"),
        new NetMall("taobao")
    );

    public static List<String> getPrice(List<NetMall> list,String productName){
        return list
                .stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符
                .map(netMall -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),//第一个%
                                netMall.calcPrice(productName))).collect(Collectors.toList());//第二个%
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for(String element:list1){
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");
    }
}

class NetMall{
    @Getter
    private String netMallName;

    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格
    }
}

//mysql in jd price is 110.48
//mysql in dangdang price is 109.06
//mysql in taobao price is 110.96
//---当前操作花费时间----costTime:3098毫秒
从功能到性能:利用CompletableFuture
  • 这里是利用异步线程,万箭齐发

  • 此处用了两步流式编程

  • 性能差距巨大

public class Case {
    static List<NetMall> list = Arrays.asList(
      new NetMall("jd"),
      new NetMall("dangdang"),
        new NetMall("taobao")
    );

    public static List<String> getPrice(List<NetMall> list,String productName){
        return list
                .stream() //----流式计算做了映射(利用map),希望出来的是有格式的字符串(利用String.format),%是占位符
                .map(netMall -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),//第一个%
                                netMall.calcPrice(productName))).collect(Collectors.toList());//第二个%
    }

    //从功能到性能
    public static List<String> getPricesByCompletableFuture(List<NetMall> list,String productName){
        return list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))))//Stream<CompletableFuture<String>>
                                .collect(Collectors.toList())//List<CompletablFuture<String>>
                                .stream()//Stream<CompletableFuture<String>
                                .map(s->s.join())//Stream<String>
                                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for(String element:list1){
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("--普通版----当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");
        System.out.println("------------------------------分割----------------------");
        startTime = System.currentTimeMillis();
        List<String> list2 = getPricesByCompletableFuture(list, "mysql");
        for(String element:list2){
            System.out.println(element);
        }
        endTime = System.currentTimeMillis();
        System.out.println("--性能版-当前操作花费时间----costTime:"+(endTime-startTime)+"毫秒");
    }
}

class NetMall{
    @Getter
    private String netMallName;

    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//用这句话来模拟价格
    }
}
//mysql in jd price is 109.49
//mysql in dangdang price is 110.85
//mysql in taobao price is 110.32
//--普通版----当前操作花费时间----costTime:3124毫秒
//------------------------------分割----------------------
//mysql in jd price is 109.34
//mysql in dangdang price is 109.02
//mysql in taobao price is 110.37
//--性能版-当前操作花费时间----costTime:1000毫秒

CompletableFuture常用API

获得结果和触发计算

获取结果

  • public T get() 不见不散,容易阻塞

  • public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常

  • public T join() 类似于get(),区别在于是否需要抛出异常

  • public T getNow(T valueIfAbsent)

没有计算完成的情况下,给一个替代结果

立即获取结果不阻塞

  • 计算完,返回计算完成后的结果
  • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)

主动触发计算

public boolean complete(T value) 是否立即打断get()方法返回括号值

(执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);//执行需要2秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        try {
            TimeUnit.SECONDS.sleep(1);//等待需要1秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
       // System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx
        System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());//执2-等1 返回true+备胎值completeValue
    }
}

对计算结果进行处理

thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
    //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
    CompletableFuture.supplyAsync(() -> {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("111");
        return 1024;
    }).thenApply(f -> {
        System.out.println("222");
        return f + 1;
    }).thenApply(f -> {
        //int age = 10/0; // 异常情况:那步出错就停在那步。
        System.out.println("333");
        return f + 1;
    }).whenCompleteAsync((v,e) -> {
        System.out.println("*****v: "+v);
    }).exceptionally(e -> {
        e.printStackTrace();
        return null;
    });

    System.out.println("-----主线程结束,END");

    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
//-----正常情况
//111
//222
//333
//----计算结果: 6

//-----异常情况
//111
//异常.....

handle 类似于thenApply,但是有异常的话仍然可以往下走一步。

public class CompletableFutureDemo2
{

public static void main(String[] args) throws ExecutionException, InterruptedException
{
    //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
    // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
    CompletableFuture.supplyAsync(() -> {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("111");
        return 1024;
    }).handle((f,e) -> {
        int age = 10/0;//异常语句
        System.out.println("222");
        return f + 1;
    }).handle((f,e) -> {
        System.out.println("333");
        return f + 1;
    }).whenCompleteAsync((v,e) -> {
        System.out.println("*****v: "+v);
    }).exceptionally(e -> {
        e.printStackTrace();
        return null;
    });

    System.out.println("-----主线程结束,END");

    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
//-----异常情况
//111
//333
//异常,可以看到多走了一步333
  • 一般用thenApply

对计算结果进行消费

  • 接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function

thenAccept

public static void main(String[] args) throws ExecutionException, InterruptedException
{
    CompletableFuture.supplyAsync(() -> {
        return 1;
    }).thenApply(f -> {
        return f + 2;
    }).thenApply(f -> {
        return f + 3;
    }).thenApply(f -> {
        return f + 4;
    }).thenAccept(r -> System.out.println(r));
}
//6
//消费一下,直接得到6

补充:Code之任务之间的顺序执行

thenRun

  • thenRun(Runnable runnable)

  • 任务A执行完执行B,并且B不需要A的结果

thenAccept

  • thenAccept(Consumer action)

  • 任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply

  • thenApply(Function fn)

  • 任务A执行完执行B,B需要A的结果,同时任务B有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
//null 

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
//resultA打印出来的 null因为没有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
//resultAresultB 返回值

CompleteFuture和线程池说明(非常重要)

  • 上面的几个方法都有普通版本和后面加Async的版本

  • thenRunthenRunAsync为例,有什么区别?

先看结论

  • 没有传入自定义线程池,都用默认线程池ForkJoinPool

  • 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池

调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

//2-1
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  pool-1-thread-1
//3号任务  pool-1-thread-1
//4号任务  pool-1-thread-1
//2-2
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRunAsync(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  pool-1-thread-1
//2号任务  ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
//3号任务  ForkJoinPool.commonPool-worker-9
//4号任务  ForkJoinPool.commonPool-worker-9

也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
//            try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
            return "abcd";
        },threadPool).thenRun(()->{
           // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
          //  try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
        }).thenRun(()->{
            //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
            System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
        });
    }
}
//1号任务  1号任务  pool-1-thread-1
//2号任务  main
//3号任务  main
//4号任务  main

源码

//CompletableFuture.java 2009行
public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool
    }
    //进入asyncPool
    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池

对计算速度进行选用

  • applyToEither方法,快的那个掌权
public class CompletableFutureDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return "play1 ";
        });

        CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return "play2";
        });

        CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用
            return f + " is winner";
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    }
}
//ForkJoinPool.commonPool-worker-9  ---come in 
//ForkJoinPool.commonPool-worker-2  ---come in 
//main  play2 is winner

对计算结果进行合并

thenCombine 合并

  • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
  • 先完成的先等着,等待其它分支任务
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return x + y;
        });
        
        System.out.println(thenCombineResult.get());
    }
}
//30

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/126852.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring boot 整合grpc 运用

文章目录 GRPC基础概念&#xff1a;Protocol Buffers&#xff1a;proto 基础语法&#xff1a;调用类型&#xff1a; Spring boot 整合 grpc项目结构&#xff1a;整合代码&#xff1a;父 pomproto 模块服务端&#xff1a;客户端&#xff1a;实际调用&#xff1a; 原生集成 GRPC基…

【uniapp/uview】Collapse 折叠面板更改右侧小箭头图标

最终效果是这样的&#xff1a; 官方没有给出相关配置项&#xff0c;后来发现小箭头不是 uview 的图标&#xff0c;而是 unicode 编码&#xff0c;具体代码&#xff1a; // 箭头图标 ::v-deep .uicon-arrow-down[data-v-6e20bb40]:before {content: \1f783; }附一个查询其他 u…

机器学习 - 决策树:技术全解与案例实战

目录 一、引言二、决策树基础决策树模型概述构建决策树的关键概念特征选择决策树的生成 决策树的剪枝 三、算法研究进阶提升树和随机森林提升树&#xff08;Boosted Trees&#xff09;随机森林&#xff08;Random Forests&#xff09; 进化算法与决策树决策树结构的进化 多目标…

MeterSphere 任意文件读取漏洞(CVE-2023-25814)

MeterSphere 任意文件读取漏洞&#xff08;CVE-2023-25814&#xff09; 免责声明漏洞描述漏洞影响漏洞危害网络测绘Fofa: title"MeterSphere" 漏洞复现1. 构造poc2. 发送数据包3. 查看文件 免责声明 仅用于技术交流,目的是向相关安全人员展示漏洞利用方式,以便更好地…

亚马逊鲲鹏系统能做什么

亚马逊鲲鹏系统是一款能绕过亚马逊智能检测&#xff0c;完全模拟人类真实行为&#xff0c;通过模拟真实的人流量来帮助你提升你的产品排名&#xff0c;让你的产品出现在搜索首页&#xff0c;从而快速提高你的销售业绩的营销工具&#xff01; 主要的功能有批量注册买家号、AI智能…

Java 并发编程面试题——Condition 接口

目录 1.Condition 接口有什么作用&#xff1f;2.如何使用 Condition&#xff1f;3.Condition 中有哪些常用的方法&#xff1f;4.✨Condition 的底层实现原理是什么&#xff1f;4.1.等待队列4.2.等待4.3.通知 &#xff08;1&#xff09;参考书籍&#xff1a; 《Java 并发编程的艺…

RabbitMQ集群配置以及负载均衡配置

RabbitMQ集群配置以及负载均衡配置 环境配置集群配置安装rabbitmq启动rabbitmq开启远程登录添加用户并且授权用户添加数据存放目录和日志存放目录查看端口拷⻉erlang.cookie将mq-2、mq-3作为内存节点加⼊mq-1节点集群中查看集群状态添加一个新的队列 RabbitMq负载均衡配置-HAPr…

VUE element组件生成的全选框如何获取值

//先声明 const Selection ref([]);//获取 const handleSelectCodeForTicket (val) > {console.log(val);// values.value val;Selection.value [];val.forEach((v) > {Selection.value.push(v);});console.log(Selection.value); }; <el-table selection-change…

《QT从基础到进阶·十七》QCursor鼠标的不同位置坐标获取

一些常用鼠标图形&#xff1a; 鼠标光标相对于整个电脑屏幕的位置&#xff1a;QCursor::pos() 当前光标相对于当前窗口的位置&#xff1a;this->mapFromGlobal(QCursor::pos()) void MainWindow::mouseReleaseEvent(QMouseEvent* event) {QPoint pos event->pos(); …

腾讯云真的是良心云!服务器带宽、CPU、硬盘IO性能大揭秘!

本文将通过对腾讯云服务器CVM S5 4核配置的云服务器进行测试&#xff0c;来评估其在带宽、CPU和硬盘IO性能方面的表现。 在云服务器的并发处理中&#xff0c;带宽是一个重要的因素。经过测试&#xff0c;腾讯云的带宽网络表现非常出色&#xff0c;能够跑满带宽&#xff0c;同时…

使用pymysql工具包连接Python与数据库

目录 一、引言 二、pymysql工具包的安装 三、连接数据库 四、执行SQL语句 1、执行查询语句&#xff1a;cursor.execute(sql) 2、插入数据&#xff1a;cursor.execute(sql, values) 3、更新数据&#xff1a;cursor.execute(sql, values) 4、删除数据&#xff1a;cursor.…

选择CRM系统主要看哪些指标?

很多企业都想选择一款好用的CRM客户管理系统&#xff0c;但是面对众多类型、品牌的CRM却犯了难。下面我们来说说&#xff0c;企业要想选到一款适合自己的、好用的CRM系统&#xff0c;主要看哪些指标&#xff1f;这里有6个步骤&#xff0c;可以帮您做到。 第1步&#xff1a;了解…

深度学习的集体智慧:最新发展综述

一、说明 我们调查了来自复杂系统的想法&#xff0c;如群体智能、自组织和紧急行为&#xff0c;这些想法在机器学习中越来越受欢迎。人工神经网络正在影响我们的日常生活&#xff0c;从执行预测性任务&#xff08;如推荐、面部识别和对象分类&#xff09;到生成任务&#xff08…

if...else绝佳替换方案

目录 方法一&#xff1a;提前 return方法二&#xff1a;枚举方案三&#xff1a;Optional 判空方案四&#xff1a;表驱动法方案五&#xff1a;策略模式 工厂方法方案六&#xff1a;责任链模式方案七&#xff1a;Function 方法一&#xff1a;提前 return 假如有如下代码&#x…

2023年【公路水运工程施工企业安全生产管理人员】复审考试及公路水运工程施工企业安全生产管理人员考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 公路水运工程施工企业安全生产管理人员复审考试根据新公路水运工程施工企业安全生产管理人员考试大纲要求&#xff0c;安全生产模拟考试一点通将公路水运工程施工企业安全生产管理人员模拟考试试题进行汇编&#xff0…

人工智能-卷积神经网络(LeNet)

为了能够应用softmax回归和多层感知机&#xff0c;我们首先将每个大小为\(28\times28\)的图像展平为一个784维的固定长度的一维向量&#xff0c;然后用全连接层对其进行处理。 而现在&#xff0c;我们已经掌握了卷积层的处理方法&#xff0c;我们可以在图像中保留空间结构。 同…

学者观察 | 数字经济中长期发展中的区块链影响力——清华大学柴跃廷

导语 区块链是一种全新的分布式基础架构与计算范式&#xff0c;既能利用非对称加密和冗余分布存储实现信息不可篡改&#xff0c;又可以利用链式数据结构实现数据信息可溯源。当前&#xff0c;区块链技术已成为全球数据交易、金融结算、国际贸易、政务民生等领域的信息基础设施…

vue Sts认证后直传图片到阿里云OSS

后端进行sts认证生成临时身份凭证&#xff0c;前端通过凭证直传图片等文件到OSS中 一 OSS配置 增加用户和角色&#xff0c;创建OSS bucket 1.1 添加用户 登录阿里云管理控制台&#xff0c;右侧头像&#xff0c;进入访问控制 点击左侧导航栏的身份管理的用户&#xff0c;点击…

网络溯源-PSEXEC-简单

题干&#xff1a; 我们的入侵检测系统&#xff08;IDS&#xff09;已发出警报&#xff0c;指示涉及使用PsExec的可疑横向移动活动。为了有效响应此事件&#xff0c;您作为 SOC 分析师的角色是分析存储在 PCAP 文件中的捕获网络流量。 攻击者首次攻击成功的计算机源网络IP地址…

23款奔驰GLC260L升级香氛负离子 感受清香四溢

香氛负离子系统是由香氛系统和负离子发生器组成的一套配置&#xff0c;也可以单独加装香氛系统或者是负离子发生器&#xff0c;香氛的主要作用就是通过香氛外壳吸收原厂的香水再通过空调管输送到内饰中&#xff0c;而负离子的作用就是安装在空气管中通过释放电离子来打击空气中…
最新文章