商城-学习整理-高级-商城业务-异步线程池(十三)

目录

  • 一、线程
    • 1、初始化线程的 4 种方式
    • 2、线程池的七大参数
    • 3、线程池的运行流程:
    • 4、例子
    • 5、常见的 4 种线程池
    • 6、开发中为什么使用线程池
  • 二、CompletableFuture 异步编排
    • 0、业务场景:
    • 1、创建异步对象
    • 2、计算完成时回调方法
    • 3、handle 方法
    • 4、线程串行化方法
    • 5、两任务组合 - 都要完成
    • 6、两任务组合 - 一个完成
    • 7、多任务组合
  • 三、测试代码

一、线程

1、初始化线程的 4 种方式

1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池

方式 1 和方式 2:主进程无法获取线程的运算结果。不适合当前场景
方式 3:主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
方式 4:通过如下两种方式初始化线程池

Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory, handler);

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

2、线程池的七大参数

* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
池中一直保持的线程的数量,即使线程空闲。除非设置了 allowCoreThreadTimeOut
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
池中允许的最大的线程数
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating. 当线程数大于核心线程数的时候,线程在最大多长时间没有接到新任务就会终止释放,
最终线程池维持在 corePoolSize 大小
* @param unit the time unit for the {@code keepAliveTime} argument
时间单位
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method. 阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了 corePoolSize
大小,就会放在这里等待空闲线程执行。
* @param threadFactory the factory to use when the executor
* creates a new thread
创建线程的工厂,比如指定线程名等
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
拒绝策略,如果线程满了,线程池就会使用拒绝策略。

在这里插入图片描述

3、线程池的运行流程:

1、线程池创建,准备好 core 数量的核心线程,准备接受任务
2、新的任务进来,用 core 准备好的空闲线程执行。
(1) 、core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行
(2) 、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
(3) 、max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终保持到 core 大小
(4) 、如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策略进行处理
3、所有的线程创建都是由指定的 factory 创建的。

4、例子

一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的;
先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个被安排上了。剩下 30 个默认拒绝策略。

5、常见的 4 种线程池

 newCachedThreadPool
 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
 newFixedThreadPool
 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
 newScheduledThreadPool
 创建一个定长线程池,支持定时及周期性任务执行。
 newSingleThreadExecutor
 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

6、开发中为什么使用线程池

1、降低资源消耗
通过重复利用已经创建好的线程降低线程的创建与销毁带来的损耗
2、提高响应速度
因为线程池中的线程数没有超过线程池最大上线时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
3、提高线程的可管理性
线程池会根据当前系统的特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配。

二、CompletableFuture 异步编排

0、业务场景:

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
在这里插入图片描述
假如商品详情页的每个查询,需要如下标注的时间才能完成

那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。

如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

Future是java5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,知道计算完成返回结果,你也可以使用canel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自己扩展了 Java 的 Future接口,提供了addListener等多个扩展方法;Google guava 也提供了通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。

作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。

CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
在这里插入图片描述

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。
在这里插入图片描述
1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;

2、计算完成时回调方法

在这里插入图片描述
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程,继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
System.out.println(Thread.currentThread().getName() + "\t
completableFuture");
int i = 10 / 0;
return 1024;
}
}).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
System.out.println("-------o=" + o.toString());
System.out.println("-------throwable=" + throwable);
}
}).exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
});
System.out.println(future.get());
}
}

3、handle 方法

在这里插入图片描述
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

4、线程串行化方法

在这里插入图片描述thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

带有 Async 默认是异步执行的。同之前。

以上都要前置任务成功完成。

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

5、两任务组合 - 都要完成

在这里插入图片描述
在这里插入图片描述
两个任务必须都完成,触发该任务。
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

6、两任务组合 - 一个完成

在这里插入图片描述
在这里插入图片描述
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

7、多任务组合

在这里插入图片描述
在这里插入图片描述

三、测试代码

import java.util.concurrent.*;

public class ThreadTest {

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // System.out.println("main......start.....");
        // Thread thread = new Thread01();
        // thread.start();
        // System.out.println("main......end.....");

        // Runable01 runable01 = new Runable01();
        // new Thread(runable01).start();

        // FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        // new Thread(futureTask).start();
        // System.out.println(futureTask.get());

        // service.execute(new Runable01());
        // Future<Integer> submit = service.submit(new Callable01());
        // submit.get();

        System.out.println("main......start.....");
        // CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("运行结果:" + i);
        // }, executor);

        /**
         * 方法完成后的处理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 0;
        //     System.out.println("运行结果:" + i);
        //     return i;
        // }, executor).whenComplete((res,exception) -> {
        //     //虽然能得到异常信息,但是没法修改返回数据
        //     System.out.println("异步任务成功完成了...结果是:" + res + "异常是:" + exception);
        // }).exceptionally(throwable -> {
        //     //可以感知异常,同时返回默认值
        //     return 10;
        // });

        /**
         * 方法执行完后端处理
         */
        // CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        //     System.out.println("当前线程:" + Thread.currentThread().getId());
        //     int i = 10 / 2;
        //     System.out.println("运行结果:" + i);
        //     return i;
        // }, executor).handle((result,thr) -> {
        //     if (result != null) {
        //         return result * 2;
        //     }
        //     if (thr != null) {
        //         System.out.println("异步任务成功完成了...结果是:" + result + "异常是:" + thr);
        //         return 0;
        //     }
        //     return 0;
        // });


        /**
         * 线程串行化
         * 1、thenRunL:不能获取上一步的执行结果
         * 2、thenAcceptAsync:能接受上一步结果,但是无返回值
         * 3、thenApplyAsync:能接受上一步结果,有返回值
         *
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务2启动了..." + res);
            return "Hello" + res;
        }, executor);
        System.out.println("main......end....." + future.get());

    }

    private static void threadPool() {

        ExecutorService threadPool = new ThreadPoolExecutor(
                200,
                10,
                10L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(10000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        //定时任务的线程池
        ExecutorService service = Executors.newScheduledThreadPool(2);
    }


    public static class Thread01 extends Thread {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }


    public static class Runable01 implements Runnable {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }


    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/87793.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

自动化测试平台seldom-platform部署及使用

介绍 seldom-platform是一个基于seldom测试框架的测试平台 项目地址&#xff1a;https://github.com/SeldomQA 文档&#xff1a;seldom 语雀 首先&#xff0c;专门为seldom测试框架提供平台化支持。其次&#xff0c;只负责自动化测试项目的解析、执行用例&#xff0c;当然…

【Django】Task4 序列化及其高级使用、ModelViewSet

【Django】Task4 序列化及其高级使用、ModelViewSet Task4主要了解序列化及掌握其高级使用&#xff0c;了解ModelViewSet的作用&#xff0c;ModelViewSet 是 Django REST framework&#xff08;DRF&#xff09;中的一个视图集类&#xff0c;用于快速创建处理模型数据的 API 视…

十八、深度学习模型30年演化史

1、模型分类 深度学习是解决问题的一系列模型与方法,但深度学习模型不是深度学习领域中唯一的研究方向,且不一定是最重要的研究方向。除了模型之外,比较重要的还有优化算法、损失函数、采样方法等。 1.1 DNN 深度神经网络(Deep Neural Networks, 以下简称DNN)是…

解决Oracle中XML插入数据时的空格问题

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

windows上ffmpeg如何录制双屏幕中的一个屏幕上的视频

首先&#xff0c;如何在window上安装ffmpeg自己查找scoop安装ffmpeg. 如题&#xff1a; 如果你有两个屏幕&#xff0c;如何让ffmpeg来录制其中的一个屏幕的视频呢。 很简单&#xff0c;首先你要查看另外一个屏幕的分辨率&#xff1a; 第一步&#xff1a;进入系统中 第二步&am…

基于OpenCV实战(基础知识一)

目录 简介 1.计算机眼中的图像 2.图片的读取、显示与保存 3.视频的读取与显示 简介 OpenCV是一个流行的开源计算机视觉库&#xff0c;由英特尔公司发起发展。它提供了超过2500个优化算法和许多工具包&#xff0c;可用于灰度、彩色、深度、基于特征和运动跟踪等的图像处理和…

Spring Clould 负载均衡 - Ribbon

视频地址&#xff1a;微服务&#xff08;SpringCloudRabbitMQDockerRedis搜索分布式&#xff09; Ribbon-负载均衡原理&#xff08;P14&#xff09; 具体实现时通过LoaBalanced注解实现&#xff0c;表示RestTemplate要被Ribbon拦截处理 orderservice调用user时候&#xff0c…

用队列实现栈

目录 题目题目要求示例 解答方法一、实现思路时间复杂度和空间复杂度代码 方法二、实现思路时间复杂度和空间复杂度代码 方法三、实现思路时间复杂度和空间复杂度代码 总结 题目 用队列实现栈 题目要求 题目链接 示例 解答 方法一、 使用两个队列来实现栈。 实现思路 题…

科技成果鉴定测试有什么意义?专业CMA、CNAS软件测评公司

科技成果鉴定测试是指通过一系列科学的实验和检测手段&#xff0c;对科技成果进行客观评价和鉴定的过程。通过测试&#xff0c;可以对科技成果的技术优劣进行评估&#xff0c;从而为科技创新提供参考和指导。 一、科技成果鉴定测试的意义 1、帮助客户了解科技产品的性能特点和…

【Spring】一次性打包学透 Spring | 阿Q送书第五期

文章目录 如何竭尽可能确保大家学透Spring1. 内容全面且细致2. 主题实用且本土化3. 案例系统且完善4. 知识有趣且深刻 关于作者丁雪丰业内专家推图书热卖留言提前获赠书 不知从何时开始&#xff0c;Spring 这个词开始频繁地出现在 Java 服务端开发者的日常工作中&#xff0c;很…

nvm安装使用教程

文章目录 下载配置安装最新稳定版 node安装指定版本查看版本切换版本删除版本 常见问题安装node后 显示拒绝访问的问题使用cnpm会报错的问题降低cnpm版本npm镜像 下载 NVM for Windows 下载地址&#xff1a;https://link.juejin.cn/?targethttps%3A%2F%2Fgithub.com%2Fcoreyb…

java学习网站_Java程序员必看的学习网站

哔哩哔哩 B 站动力节点官方视频 https://space.bilibili.com/76542346?spm_id_from333.337.0.0 超多视频免费白嫖 专注 java 培训的动力节点 毕业学员业内高薪就业&#xff0c;逐渐得到了业界广大的好评&#xff0c;被业界誉为 “口口相传的 Java 黄埔军校 “。 网址&#x…

【计算机网络】【常考问题总结】

1. ping 127.0.0.1 后会发生什么&#xff1f; ping 127.0.0.1 &#xff1b;ping 0.0.0.0 &#xff1b; ping localhost 面试官问&#xff1a;断网了&#xff0c;还能ping通 127.0.0.1 吗&#xff1f;为什么&#xff1f;_kevin_tech的博客-CSDN博客 2. MTU&#xff0c;MMU是…

某多多商品平台数据采集

某多多商品平台数据采集 声明逆向目标寻找加密位置代码分析补环境补充内容声明 本文章中所有内容仅供学习交流,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者 无关,若有侵权,请私信我立即删除! 逆向目标 Anti-Content参数 寻找加密位置 先在控制台全局搜…

【从零学习python 】69. 网络通信及IP地址分类解析

文章目录 网络通信的概念IP地址IP地址的分类A类地址B类地址C类地址D类地址E类地址私有地址 进阶案例 网络通信的概念 简单来说&#xff0c;网络是用物理链路将各个孤立的工作站或主机相连在一起&#xff0c;组成数据链路&#xff0c;从而达到资源共享和通信的目的。 使用网络…

k8s-ingress-context deadline exceeded

报错&#xff1a; rancher-rke-01:~/rke # helm install rancher rancher-latest/rancher --namespace cattle-system --set hostnamewww.rancher.local Error: INSTALLATION FAILED: Internal error occurred: failed calling webhook "validate.nginx.ingress.kube…

leetcode 118.杨辉三角

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;https://leetcode.cn/problems/pascals-triangle/description/ 代码&#xff1a; class Solution { public:vector<vector<int>> generate(int numRows) {// 先开空间vector<vector<int>> v;v.…

这样管理销售线索,销售成功率增加30%!

当你的内容、产品或服务吸引到他人的注意时&#xff0c;你就拥有了一个潜在客户。这条线索能否转化为客户&#xff0c;很大程度上取决于你的线索管理流程的质量。 但是&#xff0c;这个流程到底是什么样的&#xff0c;如何才能让它为你的业务服务呢&#xff1f; 什么是线索管理…

不在同一局域网也想轻松使用RDP?向日葵远程控制帮你实现

不在同一局域网也想轻松使用RDP&#xff1f;向日葵远程控制帮你实现 很多极客朋友家中或者办公室有不止一台PC电脑需要同时操作&#xff0c;但人挪来挪去始终不太方便&#xff0c;于是就有很多朋友采用微软自带的RDP桌面功能&#xff0c;实现在一台电脑上同时操作多台设备&…

Pyqt5打开电脑摄像头进行拍照

目录 1、设计UI界面 2、设计逻辑代码&#xff0c;建立连接显示窗口 3、结果 1、设计UI界面 将ui界面转为py文件后获得的逻辑代码为&#xff1a;&#xff08;文件名为 Camera.py&#xff09; # -*- coding: utf-8 -*-# Form implementation generated from reading ui file …