【Java】记一次服务内实现排队消费模式

主要是记录一下实现过程和实现的过程中遇到的坑。

我的业务

系统中有一个接口,是从大数据那边拉数据,之前的做法是,开个线程池,让SQL去执行,可是如果大量的慢SQL同时,请求数据库的话会适得其反。并且还有一个问题,就是数据库连接池的连接数是有限的,当慢查询把连接都占用了的话,其他的快查询就会获取不到连接而等待超时。

解决方案

方案一

给慢查询一个单独的连接池,控制连接个数
● 缺点:这样虽然解决了,慢查询阻塞快查询,但是,对于用户体验不好,可能某一个用户就将慢查询队列占满了,后续的其它用户根本查询不了。

方案二

识别慢查询,针对每个用户的慢查询,加一个分布式锁
● 缺点:这样确实能避免一个用户占满慢队列,可是,每次一个用户只能执行一个慢查询,后续如果想增加用户同时执行慢查询的数量将非常困难。

方案三

识别慢查询,给每个用户创建一个队列,每个用户的慢查询单独排队。

最终解决方案和实现

最终选择了方案三,虽然可能也不是最优的解决方案,但是考虑到我们的系统数据TO B系统,用户量不会太大。

自定义任务类

这里有个坑,我后面再说

package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * 队列查询任务
 *
 * @author limingyang1
 * @date 2023/7/3
 * @version 0.0.1
 */
@Slf4j
@Getter
@Setter
public class QueryQueueTask implements Callable<String> {

    private String taskId;
    private Supplier<String> task;

    public QueryQueueTask(String id, Supplier<String> task) {
        this.taskId = id;
        this.task = task;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        QueryQueueTask that = (QueryQueueTask) o;
        return Objects.equals(taskId, that.taskId);
    }

    @Override
    public int hashCode() {
        return Objects.hash(taskId);
    }

    @Override
    public String call() throws Exception {
        try{
            return task.get();
        } catch (Exception e) {
            log.info("QueryQueueTask call error", e);
            return null;
        }
    }
}
用户慢查询队列

这里之所以加锁,是防止多线程同事修改USER_TASK_QUEUE队列,我这里用的Java的程序锁是因为我们这个服务部署只有一台机器,如果线上是多台的话,需要改为分布式锁。

package com.t4f.bi.data.insights.server.manager.gamedata.query.queue;

import cn.hutool.core.util.ObjectUtil;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * 慢查询队列
 *
 * @author limingyang1
 * @date 2023/10/20
 * @version 0.0.1
 */
@Slf4j
@Getter
@Setter
public class UserSlowQueryQueue {

    public static final Map<String, BlockingQueue<QueryQueueTask>> USER_TASK_QUEUE = new ConcurrentHashMap<>();
    private static final Map<String, ReentrantLock> USER_LOCKS = new ConcurrentHashMap<>();

    public static boolean add(String user, QueryQueueTask task) {
        log.info(">> 用户:{} 添加慢查询任务:{}", user, task.getTaskId());
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userTaskQueue =
                    USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
            if (userTaskQueue.contains(task)) {
                log.info(">> 用户:{} 慢查询任务:{} 已经存在", user, task.getTaskId());
                return true;
            }
            userTaskQueue.add(task);
            USER_TASK_QUEUE.put(user, userTaskQueue);
        } finally {
            userLock.unlock();
        }
        return true;
    }


    public static boolean contains(String user, QueryQueueTask task) {
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userTaskQueue =
                USER_TASK_QUEUE.computeIfAbsent(user, key -> new LinkedBlockingQueue<>());
                return userTaskQueue.contains(task);
        } finally {
            userLock.unlock();
        }
    }

    public static void remove(String user, String taskId) {
        log.info(">> 用户:{} 移除慢查询任务:{}", user, taskId);
        ReentrantLock userLock = USER_LOCKS.computeIfAbsent(user, key -> new ReentrantLock());
        userLock.lock();
        try {
            BlockingQueue<QueryQueueTask> userQueue = USER_TASK_QUEUE.get(user);
            if (ObjectUtil.isNotNull(userQueue) && !userQueue.isEmpty()) {
                userQueue.removeIf(task -> task.getTaskId().equals(taskId));
            }
        } finally {
            userLock.unlock();
        }
    }
}
慢查询队列消费
package com.t4f.bi.data.insights.server.listener;

import static com.t4f.bi.data.insights.server.consts.Consts.REQUEST_ID_KEY;

import cn.hutool.core.util.IdUtil;
import com.t4f.bi.data.insights.server.config.MdcTaskDecorator;
import com.t4f.bi.data.insights.server.context.RequestContext;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.QueryQueueTask;
import com.t4f.bi.data.insights.server.manager.gamedata.query.queue.UserSlowQueryQueue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/**
 * 队列监听
 *
 * @author limingyang1
 * @date 2023/9/19
 * @version 0.0.1
 */
@Slf4j
@Component
public class QueueListener implements InitializingBean {

    private final Map<String, Executor> USER_EXECUTOR = new ConcurrentHashMap<>();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 启动队列消费线程
        startQueueListener();
    }

    private void startQueueListener() {
        log.info(">> 用户慢查询队列消费线程监听中");
        // 启动队列消费线程
        CompletableFuture.runAsync(
                () -> {
                    // 循环监听每个数据源的快慢队列
                    while (true) {
                        try {
                            // 休眠500ms
                            TimeUnit.MILLISECONDS.sleep(500);
                            Map<String, BlockingQueue<QueryQueueTask>> userTaskQueue =
                                    UserSlowQueryQueue.USER_TASK_QUEUE;
                            if (userTaskQueue.isEmpty()) {
                                continue;
                            }

                            userTaskQueue.forEach(
                                    (user, queue) -> {
                                        // 已经存在消费线程则不再创建
                                        Executor executor = getOrCreateExecutor(user);
                                        startConsuming(user, queue, executor);
                                    });

                        } catch (Exception e) {
                            log.error("Queue listener error", e);
                        }
                    }
                },
                startListenerExecutor("queue-listener"));
    }

    private Executor getOrCreateExecutor(String user) {
        return USER_EXECUTOR.computeIfAbsent(user, this::startExecutor);
    }

    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        QueryQueueTask task = queue.poll();
        if (task == null) {
            return;
        }
        CompletableFuture.runAsync(
                () -> {
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }

    public Executor startExecutor(String name) {
        log.info(">> 创建用户: {} 的消费处理线程池", name);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new MdcTaskDecorator());
        // 这里控制同事执行任务的个数
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setKeepAliveSeconds(60);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("[" + name + "]query-queue-run-task");
        // 拒绝策略:丢弃队列中最老的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }

    public Executor startListenerExecutor(String name) {
        log.info(">> 创建用户的消费监听线程池");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new MdcTaskDecorator());
        executor.setCorePoolSize(1);
        executor.setDaemon(true);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("[" + name + "]-run-task");
        // 拒绝策略:丢弃队列中最老的任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }
}
如何使用
QueryQueueTask task =
        new QueryQueueTask(
                id,
                () -> {
                    // TODO: 要执行的业务
        
                    return taskId;
                });
UserSlowQueryQueue.add(taskId, task);

遇到的坑

  1. while循环监听的时候,一定要记得设置休眠,不然会让CPU飙升
    在这里插入图片描述

  2. 在自定义任务类时,一定要实现Callable接口,而不是Runnable,因为我们在后面取出队列中的任务执行时,是希望它执行完成再释放线程,实现Runnable的话,到时候就是task.run() 方法,并不会阻塞到任务执行完毕,起不到排队的作用。实现Callable接口,则调用task.call()等待执行完成,达到慢查询排队的效果
    在这里插入图片描述
    在这里插入图片描述

  3. 任务监听程序中,在往消费线程添加任务时,一定要确保任务的有效性,不然会使线程池队列占满,触发拒绝策略,因为你相当于循环的往线程池丢无效的任务,看下面代码的区别

    // 错误的方式
    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        CompletableFuture.runAsync(
                () -> {
                    QueryQueueTask task = queue.poll();
                    // 判断任务是否有效
                    if (task == null) {
                        return;
                    }
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }
    
// 正确的方式
    private void startConsuming(
            String user, BlockingQueue<QueryQueueTask> queue, Executor executor) {
        QueryQueueTask task = queue.poll();
        // 判断任务是否有效
        if (task == null) {
            return;
        }
        CompletableFuture.runAsync(
                () -> {
                    String requestId = task.getRequestId();
                    MDC.put(REQUEST_ID_KEY, requestId);
                    RequestContext.initContext();
                    try {
                        RequestContext.putValue(REQUEST_ID_KEY, requestId);
                        log.info(">> 用户:{} 慢查询: {} 慢查询开始异步执行", user, queue.size());
                        String taskId = task.call();
                        log.info(">> 用户:{} 慢查询: {} 执行完毕", user, taskId);
                    } catch (Exception e) {
                        // handle exception
                        log.error("Queue execute error", e);
                    } finally{
                        RequestContext.clearContext();
                    }
                },
                executor);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/128298.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

用Go实现网络流量解析和行为检测引擎

1.前言 最近有个在学校读书的迷弟问我:大德德, 有没有这么一款软件, 能够批量读取多个抓包文件,并把我想要的数据呈现出来, 比如:源IP、目的IP、源mac地址、目的mac地址等等。我说&#xff1a;“这样的软件你要认真找真能找出不少开源软件, 但毕竟没有你自己的灵魂在里面,要不…

通用工作站设计方案 :807-ORI-S3R500 -多路PCIe3.0的单CPU通用工作站

ORI-S3R500 -多路PCIe3.0的单CPU通用工作站 (研华工业计算机IPC-610&#xff0c;IPC940 升级款) 一、机箱功能和技术指标&#xff1a; 系统 系统型号 ORI-SR500 主板支持 EEB(12*13)/CEB(12*10.5)/ATX(12*9.6)/Mi cro ATX 前置硬盘 最大支持2个3.5寸1个2.5寸SATA …

交通信号标志识别系统 python 深度学习 YOLOv5

[毕业设计]2023-2024年最新最全计算机专业毕设选题推荐汇总 感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;希望帮助更多的人 。 1、项目介绍 本系统基于YOLOv5&#xff0c;采用登录注册进行用…

C#操作注册表的方法

注册表是Microsoft Windows操作系统中的一个重要组成部分&#xff0c;用于存储和管理系统和应用程序的配置信息。它是一个层次结构的数据库&#xff0c;以树形结构组织&#xff0c;类似于文件系统中的文件夹和文件。 注册表存储了许多与操作系统和软件相关的信息&#…

【EI会议征稿】第三届新能源技术创新与低碳发展国际研讨会(NET-LC 2024)

第三届新能源技术创新与低碳发展国际研讨会&#xff08;NET-LC 2024&#xff09; 2024 3rd International Symposium on New Energy Technology Innovation and Low Carbon Development 先进的现代能源技术对世界各地的经济发展至关重要。持续的经济进步取决于安全、可靠和负担…

selenium三大等待

使用场景&#xff1a;有时候当我们操作页面元素时&#xff0c;需要等待这个过程才能操作成功。 做Ui自动化的时候&#xff0c;考虑到稳定性&#xff1a;多次运行同一脚本&#xff0c;都能够保证它是成功的。 一、强制等待&#xff1a;sleep(秒) 比如sleep(10)&#xff0c;就必…

一键批量转码:将MP4视频转为MP3音频的简单方法

随着数字媒体设备的普及&#xff0c;视频和音频格式转换的需求也越来越常见。其中&#xff0c;将MP4视频批量转换为MP3音频的需求尤为普遍。无论是为了提取视频中的背景音乐&#xff0c;还是为了在手机或电脑上方便地收听视频音频&#xff0c;这个过程都变得非常重要。接下来我…

opencv创建图片,绘制图片,画框,划线,改变像素点颜色

文章目录 创建空白图片创建一张渐变色彩色绘制多边形绘制多线改变像素点颜色 创建空白图片 bool tool_class::creatEmpty(int width, int height, std::string image_p) {// 创建一个空白图像cv::Mat blankImage(height, width, CV_8UC3, cv::Scalar(255, 255, 255));// 保存图…

蓝牙运动耳机哪个好?这几款蓝牙运动耳机不容错过!

音乐能有效地激发人体潜能&#xff0c;充分释放能量&#xff0c;达到更好的运动效果&#xff0c;因此对于运动爱好者来说&#xff0c;一款合适的运动蓝牙耳机至关重要&#xff0c;面对产品种类众多的运动耳机&#xff0c;很多人都会感到迷茫&#xff0c;经常有人问“有什么适合…

移位操作符 位操作符详解

hello hello&#xff0c;想我了吗? &#x1f604;&#x1f604;&#x1f604; 首先是移位操作符&#xff1a;<< 左移操作符 >> 右移操作符 注&#xff1a;移位操作符的操作数只能是整数。 << 左移操作符&#xff1a;移位规则&#xff1a; 左边抛弃、…

单片机程序无法下载?

原因一&#xff1a;电源问题 电源可能是导致STM32微控制器无法下载程序的一个常见原因。确保电源稳定对于正常运行和下载程序至关重要。以下是一些电源问题&#xff1a; 1. 电源电压不足&#xff1a;如果STM32微控制器没有足够的电压供应&#xff0c;它可能无法正常工作或下载程…

WPF ToggleButton 主题切换动画按钮

WPF ToggleButton 主题切换动画按钮 仿造最近看到的html中的一个效果&#xff0c;大致思路是文章这样&#xff0c;感觉还可以再雕琢一下。 代码如下 XAML: <UserControl x:Class"WPFSwitch.AnimationSwitch"xmlns"http://schemas.microsoft.com/winfx/200…

SpringData、SparkStreaming和Flink集成Elasticsearch

本文代码链接&#xff1a;https://download.csdn.net/download/shangjg03/88522188 1 Spring Data框架集成 1.1 Spring Data框架介绍 Spring Data是一个用于简化数据库、非关系型数据库、索引库访问&#xff0c;并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快…

ROS消息过滤器之 message_filters::Synchronizer 使用详解

在ROS中&#xff0c;当我们有多个传感器发布的数据需要同步时&#xff0c;message_filters::Synchronizer 是一个非常有用的工具。它可以确保多个消息在时间上是同步的&#xff0c;以便更有效地处理数据。 1.什么是ROS消息过滤器&#xff1f; ROS消息过滤器是一种用于处理ROS…

【uni-app + uView】CountryCodePicker 国家区号组件

1. 效果图 2. 组件完整代码 <template><u-popup class="country-code-picker-container" v-if="show" :show

CV计算机视觉每日开源代码Paper with code速览-2023.11.7

精华置顶 墙裂推荐&#xff01;小白如何1个月系统学习CV核心知识&#xff1a;链接 点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【基础网络架构】Understanding Deep Representation Lea…

护眼台灯横评|书客、明基、松下品牌大测评告诉你谁才是最亮的星!

护眼台灯哪个牌子好&#xff1f;随着护眼台灯普及率的日渐提高&#xff0c;护眼台灯市场也是十分火爆&#xff0c;但很多商家为了盈利&#xff0c;总是把重心放在宣传和营销手段上&#xff0c;从而导致护眼台灯的产品质量不过关&#xff0c;在使用过后不仅没有起到缓解眼睛疲劳…

k8s、数据存储

数据存储的概念 容器磁盘上的文件的生命周期是短暂的&#xff0c;这就使得在容器中运行重要应用时会出现一些问题。首先&#xff0c;当容器崩溃时&#xff0c;kubelet 会重启它&#xff0c;但是容器中的文件将丢失——容器以干净的状态&#xff08;镜像最初的状态&#xff09;…

易货:一种绿色、高效的商业模式

随着社会经济的发展和人民生活水平的提高&#xff0c;人们手中闲置的物品越来越多&#xff0c;如何将这些物品盘活&#xff0c;成为了一个亟待解决的问题。易货商业模式应运而生&#xff0c;它不仅可以将闲置物品变成财富&#xff0c;还可以为企业和个人带来更多的商机和资源。…

屏幕提词软件Presentation Prompter mac中文版使用方法

Presentation Prompter for mac是一款屏幕提词器软件&#xff0c;它可以将您的Mac电脑快速变成提词器&#xff0c;支持编写或导入&#xff0c;可以在一个或多个屏幕上平滑地滚动&#xff0c;Presentation Prompter 下载是为适用于现场表演者&#xff0c;新闻广播员&#xff0c;…
最新文章