【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的Redis延时队列的功能组件

手把手教你如何开发一个属于自己的延时队列的功能组件

  • 前提介绍
  • 解决痛点
  • 延时队列组件的架构
    • 延时队列组件的初始化流程
    • 延时队列组件的整体核心类架构
    • 延时队列组件的整体核心类功能
  • 延时队列的开发组件
    • 延迟队列的机制配置初始化类
      • 源码 - DelayedQueueConfiguration
      • Redission客户端的实现
        • 源码 - DelayedRedissionClientTool
        • 核心方法源码分析
          • Offer方法存储元素 - 添加阻塞队列-元素
          • poll方法获取元素 - 从阻塞队列中拉取数据
          • 定义和初始化执行线程池
          • 定义和初始化轮询线程池
            • 源码 - DelayedThreadPoolExecutor
          • 延迟队列机制支持Redis客户端
            • 源码 - DelayedRedisClientSupport
        • RedissonClientTool的工具的封装操作
          • 源码 - RedissonClientTool
        • 辅助类定义处理
          • DelayedThreadPoolSupport
          • 线程池的构建和初始化
      • 延时队列启动DelayedBootstrapInitializer
        • 源码 - DelayedBootstrapInitializer
      • 注入监听器+异常处理器
      • init初始化操作机制控制
      • 获取延时队列数据信息的模型
        • 执行线程组机制
    • 定义延时队列的消费接口
      • 定义延时队列的消费接口扩展接口
      • DelayedBootstrapRunnable
      • 延时队列的使用案例
        • 延时队列投递数据方
        • 延时队列消费数据方:
        • 延时轮询线程异常处理器:
  • 问题反馈

前提介绍

针对于目前,系统中的延时队列的开发复杂度以及统一化管理没有完成相关的标准,故此本人封装了一款,基于Redssion的框架为基础的也是基于我们现在framework为基础的延时队列框架开发机制组件,方便未来大家去开发属于自己的延时队列的开发规范以及开发成本!

解决痛点

  • 基于原始的redis失效的EntryExpiredListener的定时监听器,因为考虑周期性和性能和延迟问题过大,所以有了本次版本组件封装的优化

  • 简化开发,系统多出使用原生的redission客户端,因为这无形中给开发人员带来了很大的工作量,考虑未来的开发过程中会存在很多延时队列的场景

  • 无标准,使用的延时开发实现原理的种类非常的多,有内存机制的延时队列、消息队列的延时实现、redis的延时队列,为了达成标准化。

  • 统一化管理,防止问题重复出现或者多点问题出现机制。

延时队列组件的架构

  • 延时队列采用redis 大key或者业务组、业务类型进行划分出不同的分割领域,每个组都是属于相互隔离。

  • 自己消费自己的数据信息以及异常处理和轮询和执行机制

在这里插入图片描述

延时队列组件的初始化流程

  • 主要针对于轮询线程、执行线程的初始化

  • 主要针对于注册监听器、异常处理器机制

在这里插入图片描述

延时队列组件的整体核心类架构

在这里插入图片描述

延时队列组件的整体核心类功能

在这里插入图片描述


延时队列的开发组件

延迟队列的机制配置初始化类

DelayedQueueConfiguration:主要集中于延时队列的配置参数类,主要用于定义针对于初始化一些基础核心的基础类服务组件的集合。

源码 - DelayedQueueConfiguration

@Configuration
@ComponentScan(basePackages = "com.hyts.assemble.redisdelayer")
public class DelayedQueueConfiguration {
    /**
     * redission客户端的实现
     * @return
     */
    @Bean
    public DelayedRedissionClientTool delayedRedissionClientTool(){
        return new DelayedRedissionClientTool();
    }
    /**
     * 执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
     * @return
     */
    @Bean("delayedExecuteThreadPoolExecutor")
    public Executor delayedExecuteThreadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
        // 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    /**
     * 执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
     * @return
     */
    @Bean("delayedExecuteThreadPoolCycle")
    public Executor delayedExecuteThreadPoolCycle() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
        threadPoolTaskExecutor.setQueueCapacity(0);
        // 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
        threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
        threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    /**
     * 延迟线程池支持机制
     * @return
     */
    @Bean
    public DelayedThreadPoolSupport delayedThreadPoolSupport(@Autowired @Qualifier("delayedExecuteThreadPoolExecutor") Executor execute,
                                                             @Autowired @Qualifier("delayedExecuteThreadPoolCycle") Executor recycle){
        return new DelayedThreadPoolSupport(execute,recycle);
    }
    /**
     * 延迟队列机制支持Redis客户端
     * @return
     */
    @Bean
    public DelayedRedisClientSupport delayedRedisClientSupport(){
        return new DelayedRedisClientSupport(delayedRedissionClientTool());
    }
    /**
     * 线程池的构建和初始化
     * @return
     */
    @Bean(initMethod = "init")
    public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
        return new DelayedBootstrapInitializer();
    }

    @Bean
    @ConditionalOnMissingBean(RedissonClientTool.class)
    public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
        return new RedissonClientTool(redissonClient);
    }
}

主要包含了一下几个部件:

  • DelayedRedissionClientTool:Redission客户端的实现
  • delayedExecuteThreadPoolExecutor:执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制
  • delayedExecuteThreadPoolCycle:执行操作处理机制(考虑是IO密集型或者混合密集型机制) 异步 执行线程机制
  • DelayedThreadPoolSupport:延迟线程池支持机制
  • DelayedRedisClientSupport:延迟线程池支持机制

Redission客户端的实现

DelayedRedissionClientTool主要属于Redisson延时队列客户端实现类,主要包含了相关的对应的处理维护延时队列的元素数据信息操作类。

源码 - DelayedRedissionClientTool

@AutoConfigureAfter(value = RedissonClientTool.class)
@Slf4j
public class DelayedRedissionClientTool  {
    /**
     * redissionCLientTool工具机制
     */
    @Autowired
    RedissonClientTool redissonClientTool;
    /**
     * 自动注册
     */
    public DelayedRedissionClientTool() {
    }
    /**
     * 手动注册
     * @param redissonClientTool
     */
    public DelayedRedissionClientTool(RedissonClientTool redissonClientTool) {
        this.redissonClientTool = redissonClientTool;
    }

    /**
     * 添加阻塞队列-元素
     * @param <T>
     */
    public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
        //预先进行构建初始化参数条件机制
        executeInvokerEvent.preCondition(executeInvokerEvent);
        redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
                executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
    }
    /**
     * 获取相关的
     * @param executeInvokerEvent
     * @param <T>
     * @return
     */
    public <T> RBlockingQueue<T> takeBlockingQueue(ExecuteInvokerEvent<T> executeInvokerEvent) {
        return redissonClientTool.getRedissonClient().getBlockingQueue(executeInvokerEvent.getBizGroup());

    }
    /**
     * 操作梳理
     * @param trBlockingQueue
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
        return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
    }
}

核心方法源码分析

Offer方法存储元素 - 添加阻塞队列-元素
public <T> void offer(ExecuteInvokerEvent<T> executeInvokerEvent) {
      //预先进行构建初始化参数条件机制
      executeInvokerEvent.preCondition(executeInvokerEvent);
      redissonClientTool.addDelayQueueElement(Objects.requireNonNull(executeInvokerEvent).getBizGroup(),
                executeInvokerEvent,executeInvokerEvent.getDelayedTime(),executeInvokerEvent.getTimeUnit());
}
poll方法获取元素 - 从阻塞队列中拉取数据
public <T> ExecuteInvokerEvent<T> poll(RBlockingQueue<T> trBlockingQueue) throws InterruptedException {
      return (ExecuteInvokerEvent<T>) trBlockingQueue.take();
}
定义和初始化执行线程池

执行操作处理机制(考虑是IO密集型或者混合密集型机制) - 循环监控线程机制,执行线程池(公共默认)

@Bean("delayedExecuteThreadPoolExecutor")
    public Executor delayedExecuteThreadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolExecutor");
        // 因为可以定制化线程数量机制,是否考虑延迟机制,待议 TODO
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
定义和初始化轮询线程池

主要负责轮询获取对应的redis服务队列中的数据的线程所在的线程池。

 @Bean("delayedExecuteThreadPoolCycle")
    public Executor delayedExecuteThreadPoolCycle() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor =
                DelayedThreadPoolExecutor.initParameter("delayedExecuteThreadPoolCycle");
        threadPoolTaskExecutor.setQueueCapacity(0);
        // 系统暂时仅仅支持核心书个组,直接执行,不会存放队列数据信息
        threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getMaxPoolSize());
        threadPoolTaskExecutor.setCorePoolSize(threadPoolTaskExecutor.getCorePoolSize());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

其中内部可以看到调用了DelayedThreadPoolExecutor.initParameter的方法进行控制和初始化对应的线程池,接下来,我们来看看该线程池方法以及他 的作用是什么?

源码 - DelayedThreadPoolExecutor
public class DelayedThreadPoolExecutor {

    /**
     * 获取到服务器的cpu内核:逻辑内核核心数
     */
    private static int DEFAULT_THREAD_CORE_BASE_SIZE = Runtime.getRuntime().availableProcessors();

    /**
     * IO密集型机制控制*2
     */
    private static int DEFAULT_THREAD_CORE_SIZE_IO_TYPE = DEFAULT_THREAD_CORE_BASE_SIZE<<1;

    /**
     * 序号分配器
     */
    private static  AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 初始化参数信息
     * @return
     */
    public static ThreadPoolTaskExecutor initParameter(String threadGroup){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE);//核心池大小
        executor.setMaxPoolSize(DEFAULT_THREAD_CORE_SIZE_IO_TYPE<<4);//最大线程数 = 核心*核心池大小;
        executor.setQueueCapacity(1000);//队列程度
        executor.setKeepAliveSeconds(30);//线程空闲时间
        executor.setThreadGroupName(threadGroup);
        executor.setThreadFactory(r -> new Thread(r,String.format("%s-%s",threadGroup,atomicInteger.getAndDecrement())));
        executor.setThreadNamePrefix(threadGroup+"-");//线程前缀名称
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//配置拒绝策略
        return executor;
    }
}

上面可以看出来它主要属于一个公共方法进行控制我们的通用线程池的参数类。

主要是讲对应的Spring的ThreadPoolTaskExecutor的对象实现类进行模板化的一个功能。降低使用着的开发量。

延迟队列机制支持Redis客户端

延迟队列机制支持Redis客户端支持类,主要目的是为了作为一个讲对应的Redis客户端类的静态引用操作。

 
    @Bean
    public DelayedRedisClientSupport delayedRedisClientSupport(){
        return new DelayedRedisClientSupport(delayedRedissionClientTool());
    }
源码 - DelayedRedisClientSupport
public class DelayedRedisClientSupport {
    @Getter
    private static DelayedRedissionClientTool delayedRedissionClientTool;

    /**
     * 延迟队列控制redis服务机制
     * @param delayedRedissionClientTool
     */
    public DelayedRedisClientSupport(DelayedRedissionClientTool delayedRedissionClientTool) {
        DelayedRedisClientSupport.delayedRedissionClientTool = delayedRedissionClientTool;
    }
}

RedissonClientTool的工具的封装操作

  @Bean
    @ConditionalOnMissingBean(RedissonClientTool.class)
    public RedissonClientTool redissonClientTool(RedissonClient redissonClient) {
        return new RedissonClientTool(redissonClient);
    }
源码 - RedissonClientTool
@Slf4j
public class RedissonClientTool {

    private RedissonClient redissonClient;

    public RedissonClientTool(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    public <T> void addDelayQueueElement(String key, T t, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }

    public <T> T takeDelayQueueElement(String key) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        T t = null;
        try {
            t = blockingFairQueue.take();
        } catch (InterruptedException e) {
            log.error("takeDelayQueueElement error key: " + key, e);
        }
        return t;
    }


    /**
     * 阻塞队列添加元素
     * @param key
     * @param t
     * @param <T>
     */
    public <T> void addBlockingQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        blockingFairQueue.offer(t);
    }

    /**
     *
     * 取出队列的元素且删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T pollBlockQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        return blockingFairQueue.poll();
    }

    /**
     *
     * 取出队列的元素但是不删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T peekBlockQueueElement(String key, T t) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(key);
        return blockingFairQueue.peek();
    }


    /**
     * 队列添加元素
     * @param key
     * @param t
     * @param <T>
     */
    public <T> void addQueueElement(String key, T t) {
        RQueue<T> queue = redissonClient.getQueue(key);
        queue.offer(t);
    }

    /**
     *
     * 取出队列的元素且删除
     * @param key
     * @param t
     * @param <T>
     * @return
     */
    public <T> T pollQueueElement(String key, T t) {
        RQueue<T> queue = redissonClient.getQueue(key);
        return queue.poll();
    }


    public RedissonClient getRedissonClient () {
        return this.redissonClient;
    }

}

辅助类定义处理

DelayedThreadPoolSupport

主要通过的对应的延时队列线程池的工具支持类,主要包含了对应的这两种线程池的引用操作处理模型,如下图所示。

public class DelayedThreadPoolSupport {

    /**
     * 任务执行线程机制
     */
    @Getter
    private static Executor taskExecuteThread;

    /**
     * 任务轮询线程机制
     */
    @Getter
    private static Executor taskRecycleThread;


    /**
     * 操作处理机制
     * @param taskExecuteThread
     * @param taskRecycleThread
     */
    public DelayedThreadPoolSupport(Executor taskExecuteThread, Executor taskRecycleThread) {
        DelayedThreadPoolSupport.taskExecuteThread = taskExecuteThread;
        DelayedThreadPoolSupport.taskRecycleThread = taskRecycleThread;
    }
}
线程池的构建和初始化

主要通过线程池进行构建对应的启动初始化对象实现类,用于绑定和初始化所有需要进行延时队列监听的线程。

@Bean(initMethod = "init")
public DelayedBootstrapInitializer delayedThreadPoolExecutor(){
       return new DelayedBootstrapInitializer();
}

延时队列启动DelayedBootstrapInitializer

  • 开始初始化加载完成系统内部所有的相关的延迟队列监听上下文接口服务数据
  • 开始开展完成线程任务分配为每个分组的监听器以及任务队列分配资源
  • 开始生产相关的监听绑定关系机制
  • 开始初始化相关的异常信息处理机制
  • 初始化线程机制

源码 - DelayedBootstrapInitializer

@Slf4j
public class DelayedBootstrapInitializer {

    @Setter
    @Getter
    @DelayedQueueListener(value="delayedListenerContextMap")
    Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();

    @Setter
    @Getter
    @DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
    Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();
    
    /**
     * 初始化操作机制控制
     */
    public void init(){
        log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
                "系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
        log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
        if(MapUtils.isEmpty(delayedListenerContextMap)){
            log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
                    "请检查是否有关于实现的接口" +
                    "EventExecutableInvokerListener,以及相关@DelayedQueueListener");
            return ;
        }
        log.info("启动与生产相关的侦听绑定机制");
        Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
                delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));

        log.info("开始初始化相关的异常信息处理机制");
        Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
                delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
        if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
            Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
            log.info("启动资源分配机制");
            //推荐同一个组里面采用一个线程池进行处理机制
            getAnnotationMetadataGroup.entrySet().forEach(param->{
                log.info("初始化线程机制 {}:",param.getValue());
                executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
                        DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
                        new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
            });
        }else{
            log.warn("资源转换失败!无法执行资源执行机制");
        }
    }

    /**
     * 支持动态添加延时队列控制
     */
//    public void addExecuteDelayeQueue(ExecuteDelayedQueue executeDelayedQueue){
//        Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
//        executor.execute(new DelayedBootstrapRunnable(executeDelayedQueue.getQueueName(),param.getValue(),
//                DelayedBootstrapInitializer.getExecutorByGroup(executeDelayedQueue.getValue()),
//                new ExecutableExceptionHandler(Lists.newArrayList(new DefaultSampleDelayedExceptionHandler()))));
//    }

    /**
     * @param eventExecutableInvokerListener
     * @return
     */
    public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
        return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
    }


    public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
        return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
    }

    /**
     * 获取相关的组信息
     * @param object
     * @return
     */
    public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
        Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
        if(annotationInstance instanceof DelayedQueueListener) {
            DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedQueueListener.group();
            }
        }
        else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
            DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedExceptionHandler.group();
            }
        }
        return Strings.EMPTY;
    }


    /**
     * 执行线程组机制
     * @return
     */
    public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
        return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
                filter(Objects::nonNull).findAny().orElse(null);
    }
}

注入监听器+异常处理器

由于Spring框架可以帮我自动进行获取对象模型注入的数据集合,此部分我们采用的是Map


@Setter
@Getter
@DelayedQueueListener(value="delayedListenerContextMap")
Map<String, EventExecutableInvokerListener> delayedListenerContextMap = Maps.newHashMap();

@Setter
@Getter
@DelayedQueueExceptionHandler(value="delayedExceptionHandlerMap")
Map<String, DelayedExceptionHandler> delayedExceptionHandlerMap = Maps.newHashMap();

init初始化操作机制控制

主要是进行初始化操作init方法,之后进行获取对应的监听器以及对象,并将这些对象直接进行注入到对应的轮询线程和执行任务的线程中,方便我们整体的延时队列进行运行处理操作。

    public void init(){
        log.info("启动初始化加载并完成所有相关延迟启动初始化加载并完成所有相关延迟" +
                "系统中用于侦听上下文接口服务数据的队列 : {}",delayedListenerContextMap);
        log.info("开始完成线程任务分配,并为每个组的侦听器和任务队列分配资源");
        if(MapUtils.isEmpty(delayedListenerContextMap)){
            log.info("未找到任务侦听信息。在springcontext管理的上下文中," +
                    "请检查是否有关于实现的接口" +
                    "EventExecutableInvokerListener,以及相关@DelayedQueueListener");
            return ;
        }
        log.info("启动与生产相关的侦听绑定机制");
        Map<String,List<EventExecutableInvokerListener>> getAnnotationMetadataGroup =
                delayedListenerContextMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupListener));
        log.info("开始初始化相关的异常信息处理机制");
        Map<String,List<DelayedExceptionHandler>> delayedExceptionHandlerMapGroup =
                delayedExceptionHandlerMap.values().stream().collect(Collectors.groupingBy(DelayedBootstrapInitializer::getAnnotationMetadataGroupExceptionHandler));
        if(MapUtils.isNotEmpty(getAnnotationMetadataGroup)){
            Executor executor = DelayedThreadPoolSupport.getTaskRecycleThread();
            log.info("启动资源分配机制");
            //推荐同一个组里面采用一个线程池进行处理机制
            getAnnotationMetadataGroup.entrySet().forEach(param->{
                log.info("初始化线程机制 {}:",param.getValue());
                executor.execute(new DelayedBootstrapRunnable(param.getKey(),param.getValue(),
                        DelayedBootstrapInitializer.getExecutorByGroup(param.getValue()),
                        new ExecutableExceptionHandler(delayedExceptionHandlerMapGroup.get(param.getKey()))));
            });
        }else{
            log.warn("资源转换失败!无法执行资源执行机制");
        }
    }

获取延时队列数据信息的模型

主要通过延时队列处理类上面的注解的元数据信息,获取注解的分组Group操作属性。

  /**
     * @param eventExecutableInvokerListener
     * @return
     */
    public static String getAnnotationMetadataGroupListener(EventExecutableInvokerListener eventExecutableInvokerListener){
        return getAnnotationMetadataGroup(eventExecutableInvokerListener,DelayedQueueListener.class);
    }


    public static String getAnnotationMetadataGroupExceptionHandler( DelayedExceptionHandler delayedExceptionHandler){
        return getAnnotationMetadataGroup(delayedExceptionHandler,DelayedQueueExceptionHandler.class);
    }

    /**
     * 获取相关的组信息
     * @param object
     * @return
     */
    public static String getAnnotationMetadataGroup(Object object,Class delayedQueueListenerClass){
        Object annotationInstance = object.getClass().getAnnotation(delayedQueueListenerClass);
        if(annotationInstance instanceof DelayedQueueListener) {
            DelayedQueueListener delayedQueueListener = (DelayedQueueListener)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedQueueListener.group();
            }
        }
        else if(annotationInstance instanceof DelayedQueueExceptionHandler) {
            DelayedQueueExceptionHandler delayedExceptionHandler = (DelayedQueueExceptionHandler)annotationInstance;
            if(Objects.isNull(annotationInstance)){
                return Strings.EMPTY;
            }else{
                return delayedExceptionHandler.group();
            }
        }
        return Strings.EMPTY;
    }

执行线程组机制

public static Executor getExecutorByGroup(List<EventExecutableInvokerListener> eventExecutableInvokerListeners){
     return eventExecutableInvokerListeners.stream().map(EventExecutableInvokerListener::getExecutor).
            filter(Objects::nonNull).findAny().orElse(null);
}

定义延时队列的消费接口

调用延时队列的执行抽象接口处理模型

@FunctionalInterface
public interface ExecutableInvokerListener<P,R>  {

    /**
     * 执行方法
     * @param param 返回值为以后callable使用
     * @return
     */
    R handle(P param);
}

定义延时队列的消费接口扩展接口

public interface EventExecutableInvokerListener<P,R> extends ExecutableInvokerListener<ExecuteInvokerEvent <P>,R> {

    /**
     * 延时偏移量
     */
    long DEFAULT_DELAYED_OSFFET = 10;

    /**
     * 延时超时时间时间戳
     */
    TimeUnit DEFAULT_DELAYED_TIMEUNIT = TimeUnit.SECONDS;

    /**
     * 是否可以执行异步操作(暂不支持)
     */
    boolean DEFAULT_IS_ASYNC_FLAG = Boolean.TRUE;

    /**
     * 暂时不支持重试机制,会造成数据重复执行机制,主要面向与执行失败后的重试机制(暂不支持)
     */
    int DEFAULT_RETRY_NUM = 0;

    /**
     * 存放在同一个线程执行
     */
    String DEFAULT_BIZ_GROUP = "DEFAULT_GROUP";

    /**
     * 如果没有定义直接采用默认线程池进行执行
     */
     Executor getExecutor();
}

DelayedBootstrapRunnable

主要用于处理对应的DelayedBootstrapRunnable的控制对象模型机制,用于轮询查询获取redis队列种的数据信息,之后回调给业务端的Listener监听器操作。

@RequiredArgsConstructor
@Slf4j
public  class DelayedBootstrapRunnable implements Runnable{

    /**
     * 直接传递相关的执行客户端访问器
     */
    public DelayedRedissionClientTool delayedRedissionClientTool = DelayedRedisClientSupport.getDelayedRedissionClientTool();
    /**
     * 绑定的线程组,只会执行相关的线程组之间的关系机制
     */
    public final String bizGroup;
    /**
     * 注入参数进入
     */
    public final List<EventExecutableInvokerListener> eventExecutableInvokerListeners;
    /**
     * 执行线程池
     */
    public final Executor executorThreadPool;
    /**
     * 异常信息控制
     */
    public final ExecutableExceptionHandler exceptionHandlers;
    /**
     * 启动服务处理机制
     */
    @Override
    public void run() {
        try {
            RBlockingQueue<ExecuteInvokerEvent> blockingQueue = delayedRedissionClientTool.takeBlockingQueue(new ExecuteInvokerEvent(bizGroup));
            Executor executor = Objects.isNull(executorThreadPool) ? DelayedThreadPoolSupport.getTaskExecuteThread() : executorThreadPool;
            Thread.currentThread().setUncaughtExceptionHandler(exceptionHandlers);
            for(;;) {
               try{
                   ExecuteInvokerEvent data =  delayedRedissionClientTool.poll(blockingQueue);
                   log.info("侦听队列任务组:{},获得值:{}", bizGroup, data);
                   log.info(MessageFormat.format("【1】Execute parse complete call: the execution time should be:{0,date,yyyy-MM-dd HH:mm:ss}," +
                                   "Actual execution time:{1,date,yyyy-MM-dd HH:mm:ss},createTime:{2,date,yyyy-MM-dd HH:mm:ss}",
                           data.getFiredTime(), new Date(),new Date(data.getCreateTime())));
                   executor.execute(() -> {
                       for(EventExecutableInvokerListener eventExecutableInvokerListener : eventExecutableInvokerListeners){
                           eventExecutableInvokerListener.handle(data);
                       }
                   });
               }catch (Exception e){
                   log.error("无法执行处理",e);
               }
            }
        } catch (Exception e) {
            log.error("无法执行处理",e);
//            throw new RuntimeException(e);
        }
    }
}

延时队列的使用案例

延时队列投递数据方


@Autowired(required = false)
public DelayedRedissionClientTool delayedRedissionClientTool;
public void testProducerElement(){
        AtomicInteger atomicInteger = new AtomicInteger();
        IntStream.range(0,200).forEach(param->{
            log.info("开始投递数据信息");
             // 业务编号必须传入,为了去重;此外分组必须穿,如同mq的topic
            ExecuteInvokerEvent executeInvokerEvent = new ExecuteInvokerEvent(String.valueOf(atomicInteger.incrementAndGet()),"TEST_GROUP");
            executeInvokerEvent2.setDelayedTime(10L); // 延时时长度
            executeInvokerEvent2.setDataModel("asdasda"); //传输数据模型。泛型类型
            executeInvokerEvent2.setTimeUnit(TimeUnit.SECONDS); // 延时时间单位
            delayedRedissionClientTool.offer(executeInvokerEvent); //数据存储
        });
    }

延时队列消费数据方:

@Slf4j
@DelayedQueueListener(value="delayedQueueTest",group="TEST_GROUP")
public class DelayedQueueTest implements EventExecutableInvokerListener<ExecuteInvokerEvent<Object>,Object> {
    /**
     * 可以自定义线程池,但是一个组中,只会采用其中一个线程池去执行,防止过多使用资源
     * @return
     */
    @Override
    public Executor getExecutor() {
        return null;
    }
    /**
     * 任务执行机制控制服务
     * @param param 返回值为以后callable使用
     * @return
     */
    @Override
    public Object handle(ExecuteInvokerEvent<ExecuteInvokerEvent<Object>> param) {
        try {
            System.out.println(MessageFormat.format("【1】执行解析完成调用:应该执行时间:{0,date,yyyy-MM-dd HH:mm:ss}," +
                    "实际执行时间:{1,date,yyyy-MM-dd HH:mm:ss},创建时间:{2,date,yyyy-MM-dd HH:mm:ss}",param.getFiredTime(), new Date(),new Date(param.getCreateTime())));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

延时轮询线程异常处理器:

 */
@DelayedQueueExceptionHandler(value="delayedHandler",group="TEST_GROUP")
public class DelayedTestQueueExceptionHandler implements DelayedExceptionHandler {
    @Override
    public void catchException(Throwable e, Thread currentThread) {
        System.out.println("asdasdasda---------------------");
//        e.printStackTrace();
    }
}

问题反馈

  1. 大家是不是觉得非常便利开发相关的延迟队列?
  2. 异常处理机制待优化
  3. 性能提升带优化
  4. 循环线程属于非常痛点和薄弱的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/14771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络基础,InetAddress,Socket,TCP,UDP

概念&#xff1a;两台设备之间通过网络实现数据运输网络通信&#xff1a;将数据通过网络从一台设备传输到另一台设备java.net包下提供了一系列的类或接口&#xff0c;供程序员使用&#xff0c;完成网络通信网络&#xff1a;两台或多台设备通过一定物理设备连接起来构成了网络根…

文件和用户管理

Linux基础 提示&#xff1a;个人学习总结&#xff0c;仅供参考。 一、Linux系统部署 二、服务器初始化 三、文件和用户管理 四、用户的权限 提示&#xff1a;文档陆续更新整理 文件和用户管理 Linux基础一、Linux目录结构二、文件管理1.文件类型2.文件管理命令 三、用户管理…

为什么医疗保健需要MFT来帮助保护EHR文件传输

毫无疑问&#xff0c;医疗保健行业需要EHR技术来处理患者&#xff0c;设施&#xff0c;提供者等之间的敏感患者信息。但是&#xff0c;如果没有安全的MFT解决方案&#xff0c;您将无法安全地传输患者文件&#xff0c;从而使您的运营面临遭受数据泄露&#xff0c;尴尬&#xff0…

如何平衡倾斜摄影的三维模型轻量化数据文件大小和质量效果?

如何平衡倾斜摄影的三维模型轻量化数据文件大小和质量效果&#xff1f; 倾斜摄影超大场景的三维模型数据文件大小的具体范围取决于多种因素&#xff0c;如原始数据的复杂度、轻量化处理的方式和压缩算法等。一般而言&#xff0c;经过轻量化处理后&#xff0c;数据文件大小可以减…

c/c++:栈帧,传值,传址,实参传值给形参,传地址指针给形参

c/c&#xff1a;栈帧&#xff0c;传值&#xff0c;传址&#xff0c;实参传值给形参&#xff0c;传地址指针给形参 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;此时学会c的话&#xff0c; 我所知道的周边的会c的同学&…

WuxioLin 反锯齿算法(反走样算法,Xiaolin Wu Anti-aliasing algorithm) C# 代码实现

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、锯齿和反锯齿二、Xiaolin Wu 算法代码1.C#完整代码如下2.举例和测试 总结 前言 笔者前几日自己写了个佳明手表表盘的的一个入门级App&#xff0c;模拟指针…

甘肃vr全景数字化展厅提高企业品牌认知度和销售效果

相比传统式展厅给观众们呈现的是静态的视觉体会&#xff0c;缺乏实时交互水平。而720VR全景虚拟展厅能够提供高度真实的展览体验&#xff0c;融合视、听、触等各种感官享受&#xff0c;带来颠覆的沉浸式体验。 即便社恐的你也能在虚拟现实的世界游刃有余&#xff0c;想看哪里点…

AD9208子卡设计资料: 2 路 2.6GSPS/3GSPS AD 采集、2 路 12.6G DA 回放、高性能时钟发生器HMC7044 -FMC 子卡模块

板卡概述 FMC123 是一款基于 FMC 标准规范&#xff0c;实现 2 路 14-bit、3GSPSADC 采集功能、2 路 16-bit 12.6GSPS 回放子卡模块。该模块遵循 VITA57.1 标准&#xff0c;可直接与 FPGA 载卡配合使用&#xff0c;板卡 ADC 器件采用 ADI 公司的 AD9208 芯片&#xff0c;&…

《中学科技》期刊简介及投稿邮箱

《中学科技》期刊简介及投稿邮箱 《中学科技》以传播科技知识、启迪智慧、培养才能为宗旨&#xff0c;提供电子技术、计算机、陆海空模型、数学、物理、化学、生物、天文等方面的科技活动资料&#xff0c;特别注意通过科学观察&#xff0c;实验和制作实践的途径&#xff0c;培…

CCGNet用于发现共晶材料中的coformer

共晶工程&#xff08;cocrystal engineering&#xff09;在制药&#xff0c;化学和材料领域有广泛应用。然而&#xff0c;如何有效选择coformer一直是一个挑战性课题。因此&#xff0c;作者开发了一个基于GNN的深度学习框架用于快速预测共晶的形成。为了从现有报告的6819个正样…

Java项目上线之云服务器环境篇(二)——Tomcat的安装与配置

Java项目上线之云服务器环境篇&#xff08;二&#xff09;——Tomcat的安装与配置 Tomcat的选择&#xff1a; 云服务器tomcat的选择最好与本机项目运行的tomcat版本号一致&#xff0c;避免一些不必要的问题。 配置步骤&#xff1a; 1、首先进入云服务器创建好放置tomcat的文件…

重大剧透:你不用ChatGPT,它砸你饭碗

早晨看到路透社报道&#xff0c;盖茨说&#xff0c;与其争论技术的未来&#xff0c;不如专注于如何更好地利用人工智能。 这可能是他对马斯克他们呼吁暂停AI研发6个月的一种回应吧。 有种古语说&#xff1a;天下大势&#xff0c;浩浩汤汤&#xff0c;顺之者昌&#xff0c;逆之者…

2023年Q1天猫空调品牌销量排行榜

如今&#xff0c;空调的普及水平较高&#xff0c;空调行业进入存量换新为主的发展阶段。 根据鲸参谋数据分析平台的相关数据显示&#xff0c;2023年Q1在天猫平台上&#xff0c;空调的销量将近100万件&#xff0c;销售额将近30亿&#xff0c;同时&#xff0c;空调产品的产品均价…

免费gpt-4-国内使用gpt-4

如何用上gpt-4 GPT-4尚未正式发布和公开&#xff0c;因此我们无法提供对GPT-4的具体使用方法。但是&#xff0c;可以从GPT-4的前一代——GPT-3的使用经验和GPT-4的预期功能来看&#xff0c;建议如下&#xff1a; 了解GPT-4的语言处理能力和适用场景&#xff1a;GPT-4预计将进一…

vue---组件逻辑复用方法:Mixin/HOC/Renderless组件

目录 1、Mixin 2、HOC 3、Renderless组件 下文通过表单校验来分别讲解Mixin/HOC/Renderless组件这三种方式。 1、Mixin 通过mixin将一个公用的validate函数同步到每一个组件中去 mixin使用详细介绍见&#xff1a;vue---mixin混入_maidu_xbd的博客-CSDN博客一个混入对象可…

SpringBoot实战(十六) 集成Hystrix

目录 一、简介1.Hystrix 的定义&#xff1f;2.Hystrix 的用处&#xff1f;3.Hystrix 的三种状态&#xff1f;4.Hystrix 解决什么问题&#xff1f;5.Hystrix 的设计原理&#xff1f;6.Hystrix 的实现原理&#xff1f; 二、集成 Hystrix1.Maven 依赖2.application.yml简易版&…

AutoGPT 安装指南,使用避坑要点

最近&#xff0c; AIGC 中最火的可能就当属于 AutoGPT 了吧&#xff0c;首先简单介绍一下AutoGPT 背景 AutoGPT 是基于 ChatGPT API 接口开发&#xff0c;项目首推 GPT-4 模型&#xff0c;但 OpenAI 账号 API 只有 gpt-3.5-turo 权限同样也可以使用。 项目在 github 上获取的…

【服务器数据恢复】重装系统导致分区无法访问的数据恢复案例

服务器数据恢复环境&#xff1a; 磁盘柜raid卡15块磁盘组建一组raid5磁盘阵列&#xff0c;划分2个lun&#xff1b; 上层操作系统划分若干分区&#xff0c;通过LVM扩容方式将其中一个分区加入到了root_lv中&#xff0c;其他分区格式化为XFS文件系统。 服务器故障&#xff1a; 为…

DFMEA 在车用燃料电池空压机设计中的应用

摘要&#xff1a; DFMEA在空压机研发中的应用 氢气具有资源丰富、热值高和无污染等特点&#xff0c;因而是燃料电池汽车最理想的二次能源。空压机作为燃料电池汽车的关键总成&#xff0c;掌握其核心部件的设计和制造技术非常必要。应用传统的设计方法进行相关零部件如空气轴承…

Reid之损失函数理论学习讲解

基于深度学习的Reid主要流程为输入图像-->CNN(提取特征)-->Global average pooling-->特征向量&#xff0c;将用这些特征来衡量图像的相似情况。并用这些特征进行检索&#xff0c;返回分类情况。 在训练网络的时候需要涉及损失函数&#xff0c;因此就引出了表征学习和…