延时队列的三种实现方案

延时队列的三种实现方案

    • 什么是延时队列
    • 延时队列的应用场景
    • 基于Java DelayQueue的实现
      • 源码剖析
    • 基于Redis的zset实现
      • 实现步骤
      • Redis延时队列优势
      • Redis延时队列劣势
    • 基于RabbitMQ的延时队列实现
      • TTL + DXL(死信队列)
      • 插件实现
    • 总结
    • 参考文章

什么是延时队列

在分布式系统中,延时队列(Delay Queue)是一个常见的工具,它 允许程序能够按照预定时间处理任务(类似于定时任务)。延时队列允许我们将任务延时到指定的时间执行,这样就可以将任务按照优先级和执行时间来处理,从而提高系统的可靠性和性能。
延时队列是一种特殊的队列相比于普通队列(先进先出)最大的区别就体现在其延时属性上。在这种队列中,每个元素都有一个预设的延时时间,只有当这个时间到期后,元素才可以被消费。这种机制使得延时队列可以用于实现定时任务、消息重试等功能。
在这里插入图片描述

延时队列的应用场景

延时队列在实际应用中有很多应用场景,例如:

  1. 定时任务:使用延时队列可以实现定时任务,例如每隔一段时间执行某个操作,或者在特定的时间点执行某个操作。

注:延时队列和定时任务的区别

  1. 定时任务一般是有固定时间周期的,有明确的触发时间。而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内执行,没有执行周期。
  2. 定时任务一般批处理操作多个任务,延时任务一般是单个任务。
  1. 消息重试:在分布式系统中,消息可能因为网络原因或其他原因无法成功送达,此时可以使用延时队列实现消息的重试机制。消息发送失败后,将消息存入延时队列设置一个合适的延时时间,当时间到期后,重新发送消息
  2. 缓解并发压力:在高并发场景下,将大量请求先存入延时队列,然后由消费者逐一处理,从而避免瞬间请求对系统造成压力。
  3. 订单超时自动取消:电商系统中,用户下单后需要在一定时间内付款,否则订单会被自动取消。这种场景下,可以使用延时队列实现订单的超时监控。(类似于上面说的定时任务)
  4. 消息通知:在很多业务场景中,需要给用户发送消息通知,但是由于某些原因,这些消息不能及时发送。例如:当用户购买一件商品时,需要在3天内发货,如果超时未发货,需要给用户发送一条消息通知。这时候就可以通过延时队列来实现。(服务端主动向客户端发送消息)

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

在这里插入图片描述


接下来,我们将学习几种常见的延时队列实现方式,包括Java中的DelayQueue,Redis的zset和RabbitMQ的延时队列。

基于Java DelayQueue的实现

DelayQueue是Java并发包java.util.concurrent中提供的一个支持延时获取元素的无界阻塞队列。它的内部实现是基于优先级队列(PriorityQueue)按照元素的过期时间进行排序
DelayQueue中,元素必须实现Delayed接口,该接口提供了getDelay()方法,该方法返回元素的剩余延时时间。
DelayQueue是一个无界队列,它不允许插入null元素,并且元素必须是可比较的。它的元素会按照剩余延时时间的升序排列。在DelayQueue中,当调用take()方法时,如果队列中没有元素,则线程会阻塞等待,直到有元素被添加到队列中。

在这里插入图片描述

下面是一个使用Java DelayQueue实现延时队列的示例代码:

import java.util.concurrent.*;

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Message> queue = new DelayQueue<>();

        queue.put(new Message("message 1", 2000));
        queue.put(new Message("message 2", 1000));
        queue.put(new Message("message 3", 3000));

        while (!queue.isEmpty()) {
            System.out.println(queue.take());
        }
    }
}

class Message implements Delayed {
    private String message;
    private long delayTime;

    public Message(String message, long delayTime) {
        this.message = message;
        this.delayTime = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = delayTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.delayTime < ((Message) o).delayTime) {
            return -1;
        } else if (this.delayTime > ((Message) o).delayTime) {
            return 1;
        }
        return 0;
    }

    @Override
    public String toString() {
        return "Message{" +
                "message='" + message + '\'' +
                ", delayTime=" + delayTime +
                '}';
    }
}

在上面的示例中,我们创建了一个DelayQueue,并向队列中添加了三个元素。每个元素都是Message对象,该对象实现了Delayed接口,并实现了getDelay()compareTo()方法。在主函数中,我们使用while循环不断从队列中取出元素,直到队列为空。

源码剖析

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    // 一个重入锁,用于保证线程安全
    private final transient ReentrantLock lock = new ReentrantLock();
    // 一个优先级队列,用于存储元素,并维护堆的结构,内部是一个基于数组实现的小顶堆
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 一个线程变量,用于记录当前等待堆顶元素到期的线程
    private Thread leader = null;

     /**
     * 一个条件变量,用于实现阻塞等待和唤醒机制
     * 在队列中存在元素的时候,第一个调用take()方法的线程将成为leader线程,
	 * 它将会在available上等待队列头结点剩余的延迟时间
     * 其他的线程将会成为follower线程,它们会一直在available上一直等待
     * leader线程苏醒之后会将leader变量置空,在获取到元素之后最后会唤醒一个在available上等待的follower线程
     * 被唤醒的follower线程将可能成为新的leader线程
     */
    private final Condition available = lock.newCondition();

    /**
     * Creates a new {@code DelayQueue} that is initially empty.
     */
    public DelayQueue() {}

    /**
     * Creates a {@code DelayQueue} initially containing the elements of the
     * given collection of {@link Delayed} instances.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

   
    public boolean add(E e) {
        return offer(e);
    }

    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        // 获取锁,保证线程同步
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 调用优先级队列的offer方法将元素插入到数组中,并调整堆的结构(根据compareTo方法比较并构建小顶堆)
            q.offer(e);
            // 判断插入的元素是否是堆顶元素(新加入的元素e是否是延迟时间最短的元素)
            if (q.peek() == e) {
                /** 如果是,则将leader线程置空,让后来的线程可以当选为leader
				* 唤醒一个在available上等待的消费线程,让它和新消费线程重新争夺leader。
            	**/
                leader = null;
                available.signal();
            }
            // 这里不会检查队列是否满了,所以一定会返回true
            return true;
        } finally {
            lock.unlock();
        }
    }

    
    public void put(E e) {
        offer(e);
    }

    
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

    /**
     * Retrieves and removes the head of this queue, or returns {@code null}
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 调用优先级队列的peek方法获取堆顶元素
            E first = q.peek();
            // 判断该元素是否为空或者未到期
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                // 如果不是,则调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     * 出队,获取并移除此延迟队列已过期的队头,如果此时没有已过期的队头,那么一直等待。
     */
    public E take() throws InterruptedException {
         // 获取锁,如果被中断则抛出异
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //开启一个死循环
            for (;;) {
                // 调用优先级队列的peek方法获取堆顶元素
                E first = q.peek();
                // 如果为空,则阻塞等待在available条件上,直到被唤醒或者中断
                if (first == null)
                    available.await();
                else {
                    // 如果不为空,则获取该元素的剩余延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果小于等于0,则说明该元素已经到期,
                    // 调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构
                    if (delay <= 0)
                        return q.poll();
                    // 如果大于0即未到期,则将first置空方便gc回收
                    first = null; // don't retain ref while waiting
                    // 判断leader线程是否为空
                    if (leader != null)
                        // 如果不为空,则说明有其他线程正在等待堆顶元素到期,
                        // 当前线程也阻塞等待在available条件上,直到被唤醒或者中断
                        available.await();
                    else {
                        /** 如果为空,则将当前线程设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,
                        * 在finally块中判断当前线程是否是leader线程,如果是,则将leader线程置空,
                    	* 并唤醒下一个等待在available条件上的线程
                        **/ 
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 阻塞等待堆顶元素的剩余延迟时间,直到被唤醒或者中断
                            available.awaitNanos(delay);
                        } finally {
                            // 如果leader还是当前线程就把它置为空,让其他线程有机会获取元素
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue,
     * or the specified wait time expires.
     *
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element with
     *         an expired delay becomes available
     * @throws InterruptedException {@inheritDoc}
	 * 在指定时间内等待获得到期的队列头;如果在队头过期之前超过了指定的等待时间,则返回 null。
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

    /**
     * Retrieves, but does not remove, the head of this queue, or
     * returns {@code null} if this queue is empty.  Unlike
     * {@code poll}, if no expired elements are available in the queue,
     * this method returns the element that will expire next,
     * if one exists.
     *
     * @return the head of this queue, or {@code null} if this
     *         queue is empty
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }


    /**
     * Returns first element only if it is expired.
     * Used only by drainTo.  Call only when holding lock.
     */
    private E peekExpired() {
        // assert lock.isHeldByCurrentThread();
        E first = q.peek();
        return (first == null || first.getDelay(NANOSECONDS) > 0) ?
            null : first;
    }



    /**
     * Removes a single instance of the specified element from this
     * queue, if it is present, whether or not it has expired.
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }



    /**
     * Returns an iterator over all the elements (both expired and
     * unexpired) in this queue. The iterator does not return the
     * elements in any particular order.
     *
     * <p>The returned iterator is
     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
     *
     * @return an iterator over the elements in this queue
     */
    public Iterator<E> iterator() {
        return new Itr(toArray());
    }

    /**
     * Snapshot iterator that works off copy of underlying q array.
     */
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor;           // index of next element to return
        int lastRet;          // index of last element, or -1 if no such

        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }

        public boolean hasNext() {
            return cursor < array.length;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E)array[cursor++];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);
            lastRet = -1;
        }
    }

}

leader线程是一个线程变量,它记录了当前正在等待堆顶元素到期的线程。它的作用是为了避免多个线程同时等待同一个元素到期,从而造成资源浪费和竞争。如果有多个线程同时调用take方法,只有第一个线程会被设为leader线程,并阻塞等待堆顶元素的剩余延迟时间,其他线程则会阻塞等待在available条件上,直到被唤醒或者中断。当leader线程被唤醒或者中断后,它会尝试获取堆顶元素,如果成功,则将leader线程置空,并唤醒下一个等待在available条件上的线程,让它成为新的leader线程;如果失败,则重新进入循环判断堆顶元素是否到期。这样就保证了每次只有一个线程在等待堆顶元素到期,提高了效率和公平性。
举个例子:
假设有三个线程A、B、C同时调用take方法,队列中有一个元素D,它的延迟时间是10秒,当前时间是0秒。

  • 线程A先获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程为空,将自己设为leader线程,并阻塞等待10秒。
  • 线程B后获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
  • 线程C再获取到锁,然后获取堆顶元素D,发现它还没有到期,判断leader线程不为空,阻塞等待在available条件上。
  • 10秒后,线程A被唤醒,然后获取堆顶元素D,发现它已经到期,调用优先级队列的poll方法将堆顶元素弹出,并调整堆的结构。然后判断自己是否是leader线程,如果是,则将leader线程置空,并唤醒下一个等待在available条件上的线程(假设是线程B)。
  • 线程B被唤醒,然后获取堆顶元素D,发现它为空(因为已经被线程A弹出了),阻塞等待在available条件上(直到有新的元素入队或者被中断)。
  • 线程C仍然阻塞等待在available条件上(直到有新的元素入队或者被中断)。

基于Redis的zset实现

Redis的zset(有序集合)也可以用来实现延时队列。我们知道,zset是一种特殊的集合,其内部成员都是有序排列的,每个元素都关联一个分数值,根据这个分数值对元素进行排序。我们可以将元素的过期时间作为分数值,从而实现延时队列

在这里插入图片描述

有序集合(zset)同样使用了两种不同的存储结构,分别是 zipList(压缩列表)(在Redis7中是listpack)和 skipList(跳跃列表),具体可以看下面这篇博客:Redis底层数据结构分析(二) —— Hash结构_redis哈希表长度_小熊不吃香菜的博客-CSDN博客

实现步骤

  1. 将任务的到期时间作为分值(zadd key timestamp task),将任务的内容作为成员,添加到zset中。
  2. 使用zrangebyscore命令,根据当前时间戳,获取分值小于等于当前时间戳的成员,即到期的任务。
  3. 使用ZREM命令,删除获取到的成员,防止重复执行。
  4. 对获取到的任务进行后续处理
    在这里插入图片描述

下面是一个使用Redis的zset实现延时队列的示例代码:

import redis.clients.jedis.Jedis;

import java.util.Set;

public class RedisDelayQueueDemo {
    private static final String QUEUE_NAME = "delay-queue";

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379);

        // 向队列中添加元素
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 2000, "message 1");
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 1000, "message 2");
        jedis.zadd(QUEUE_NAME, System.currentTimeMillis() + 3000, "message 3");

        while (true) {
            Set<String> messages = jedis.zrangeByScore(QUEUE_NAME, 0, System.currentTimeMillis(), 0, 1);
            if (!messages.isEmpty()) {
                String message = messages.iterator().next();
                jedis.zrem(QUEUE_NAME, message);
                System.out.println(message);
            } else {
                Thread.sleep(1000);
            }
        }
    }
}

在上面的示例中,我们使用Jedis连接Redis,并向队列中添加了三个元素,每个元素都是一个字符串,它们的score值分别为元素过期时间。然后,我们使用一个while循环不断从zset中取出过期的元素,并将其从zset中移除。

Redis延时队列优势

  1. Redis zset支持高性能的 score 排序。
  2. Redis是在内存上进行操作的,速度非常快。
  3. Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
  4. Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性。

Redis延时队列劣势

  1. 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
  2. 没有重试机制 。处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
  3. 没有 ACK 机制。 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了。

如果对消息可靠性要求较高, 推荐使用 MQ 来实现

基于RabbitMQ的延时队列实现

RabbitMQ是一个广泛使用的消息队列中间件,它也支持延时队列的功能。我们可以通过为队列设置消息的TTL(Time-to-Live)属性 ,或者通过RabbitMQ的插件实现延时队列。

消息的流向:

在这里插入图片描述


两种实现方式:

TTL + DXL(死信队列)

在这里插入图片描述

  1. TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间 , 单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL:

(1) 消息设置TTL ,针对每条消息设置TTL
在这里插入图片描述

(2) 队列设置TTL ,在创建队列的时候设置队列的x-message-ttl属性
在这里插入图片描述

两者的区别:如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

  1. 死信队列

    死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
    (1) 应用场景
    为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

    (2) 死信的来源

    • 消息 TTL 过期
    • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

(3) 结构图
在这里插入图片描述

对于消息TTL过期,即对应我们上面介绍的延时队列。此时生产者和消费者代码:

在这里插入图片描述


消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
        	{String message = new String(delivery.getBody(), "UTF-8");
        	System.out.println("Consumer01 接收到消息"+message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
	}
}

在这里插入图片描述


消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
        {String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}    

在这里插入图片描述

下面是一个使用TTL+死信队列实现延时队列的示例代码,通过创建一个延时队列和一个死信队列,将消息发送到延时队列中,在延时时间到达后,消息将被转发到死信队列中。

// 导入RabbitMQ相关的依赖
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class DelayQueueDemo {

    // 定义交换机、队列、路由键等常量
    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.routing.key";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂和连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 声明延时交换机,类型为direct
        channel.exchangeDeclare(DELAY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明死信交换机,类型为direct
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明延时队列,并设置TTL和死信交换机
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 10000); // 设置消息的过期时间为10秒
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 设置死信交换机
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); // 设置死信路由键
        channel.queueDeclare(DELAY_QUEUE_NAME, true, false, false, args);

        // 声明死信队列
        channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);

        // 绑定延时队列和延时交换机
        channel.queueBind(DELAY_QUEUE_NAME, DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY);

        // 绑定死信队列和死信交换机
        channel.queueBind(DEAD_LETTER_QUEUE_NAME, DEAD_LETTER_EXCHANGE, DEAD_LETTER_ROUTING_KEY);

        // 创建消费者,监听死信队列
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取消息内容并打印
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
            }
        };

        // 开始消费消息,自动确认
        channel.basicConsume(DEAD_LETTER_QUEUE_NAME, true, consumer);

        // 发送一条延时消息到延时队列
        String message = "Hello delay queue!";
        channel.basicPublish(DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY,
                null, message.getBytes("UTF-8"));
        System.out.println("Sent message: " + message);

    }
}

插件实现

在RabbitMQ中,延时队列也可以通过x-delayed-message插件实现的,创建一个类型为x-delayed-message的交换机,发送消息时设置x-delay头部,表示延迟时间。消息到期后会被投递到绑定的队列,消费者监听该队列即可。

在这里插入图片描述


在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

配置类代码:

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    @Bean
    public Queue delayedQueue() {
   	 	return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() { 
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, 
                                  "x-delayed-message", true, false, args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
        		@Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange)
            	.with(DELAYED_ROUTING_KEY).noargs();
    }
}

消息生产者代码 :

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
	rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, 
                                  message,correlationData ->{
    	correlationData.getMessageProperties().setDelay(delayTime);
    	return correlationData;
	});
	log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 
             delayed.queue:{}", new Date(),delayTime, message);
}                          

消息消费者代码 :

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
    String msg = new String(message.getBody());
	log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}

在这里插入图片描述

第二个消息被先消费掉了,符合预期

总结

延时队列是一种非常有用的消息队列,它可以在一定的延时时间后将消息投递到消费者。延时队列可以应用于多种场景,例如定时任务、订单超时处理等。实现延时队列的方法有多种,可以使用Java中的DelayQueue、Redis的zset,也可以使用RabbitMQ等消息队列。不同的实现方式有各自的优缺点,需要根据实际情况选择合适的实现方式。

  • Java的DelayQueue实现延时队列,是一种基于JDK自带的类的方案,它是一个存储延时任务的环形队列,可以高效地循环遍历。它的优点是简单易用,不需要依赖其他组件,缺点是不支持持久化和分布式,如果程序崩溃或者重启,延时任务会丢失。
  • 使用Redis的zset实现延时队列,是一种基于Redis有序集合的方案,它可以利用score来表示延时时间,通过zaddzrangebyscore等命令来入队和出队。它的优点是高性能,支持持久化和高并发,可以通过Redis集群来提高可用性和扩展性,缺点是需要额外的进程来轮询Redis,并且可能存在消息重复消费或者丢失的风险。
  • 使用rabbitmq实现延时队列,是一种基于rabbitmq的TTL和死信队列功能的方案,它可以通过设置消息或者队列的过期时间,以及将过期消息转移到死信队列中来实现延时效果。它的优点是利用了rabbitmq本身的消息可靠发送、投递和消费机制,保证了消息至少被消费一次,并且可以通过rabbitmq集群来解决单点故障问题,缺点是需要额外配置TTL和死信队列,并且可能存在消息堆积或者延迟不准确的问题(也可以使用插件方式)。

参考文章

  1. 延时队列
  2. 你真的知道怎么实现一个延迟队列吗 ?

文章中所有图片来源于上述文章!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/17296.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java之多线程初阶2

目录 一.上节内容复习 1.进程和线程的区别 2.创建线程的四种方式 二.多线程的优点的代码展示 1.多线程的优点 2.代码实现 三.Thread类常用的方法 1.Thread类中的构造方法 2.Thread类中的属性 1.为线程命名并获取线程的名字 2.演示isDaemon() 3.演示isAlive() 4.演示…

ChatGPT写文章效果-ChatGPT写文章原创

ChatGPT写作程序&#xff1a;让文案创作更轻松 在当前数字化的时代&#xff0c;营销推广离不开文案创作。然而&#xff0c;写作对许多人来说可能是一项耗时而枯燥的任务。如果您曾经为写出较高质量的文案而苦恼过&#xff0c;那么ChatGPT写作程序正是为您而设计的。 ChatGPT是…

Python 模块

目录 1.模块导入语言 1.1 import 语句 1.2 from…import 语句​编辑 2. 搜索路径 3.命名空间和作用域 4.globals() 和 locals() 函数 5.reload() 函数 6.Python中的包 7.自定义模块及其调用 7.1 创建模块及__init__.py初始化文件 7.2 __init__.py的参数__all__ …

【vite+vue3.2 项目性能优化实战】打包体积分析插件rollup-plugin-visualizer视图分析

rollup-plugin-visualizer是一个用于Rollup构建工具的插件&#xff0c;它可以生成可视化的构建报告&#xff0c;帮助开发者更好地了解构建过程中的文件大小、依赖关系等信息。 使用rollup-plugin-visualizer插件&#xff0c;可以在构建完成后生成一个交互式的HTML报告&#xf…

从血缘进化论的角度,破解婆媳关系的世纪难题

从血缘进化论的角度&#xff0c;破解婆媳关系的世纪难题 有个粉丝的留言&#xff0c;很长很复杂&#xff0c;是关于他们家的婆媳关系问题。 青木老师&#xff0c;您好&#xff0c;我也有一些问题想咨询您&#xff0c;是关于婆媳关系的&#xff0c;字数有些多&#xff0c;分开…

【ElasticSearch】EQL操作相关

文章目录 EQL操作基础语法数据准备数据窗口搜索统计符合条件的事件事件序列 安全检测数据准备查看数据导入情况获取 regsvr32 事件的计数检查命令行参数检查恶意脚本加载检查攻击成功可能性 EQL操作 EQL 的全名是 Event Query Language (EQL)。事件查询语言&#xff08;EQL&…

【问题记录】flask开发blog

文章目录 小知识点问题1. 文章标签显示错误2. 文章状态无法回显&#xff08;open)3. 用户管理页面&#xff0c;图标无法显示4. BuildError5. 用户管理添加用户&#xff0c;使用重复的用户名会报错(open)6. 添加用户&#xff0c;不上传头像会报错(open)7. 部分标签删除时报错&am…

JAVA springboot创业实践学分管理系统idea开发mysql数据库web结构计算机java编程MVC

一、源码特点 idea springboot创业实践学分管理系统是一套完善的web设计系统mysql数据库MVC模式开发&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式 开发。 JAVA springboot创业实践学分管理系统ide…

Ubuntu搜狗输入法安装指南

Ubuntu搜狗输入法安装指南 Ubuntu搜狗输入法安装指南搜狗输入法已支持Ubuntu1604、1804、1910、2004、2010Ubuntu20.04及以上安装搜狗输入法步骤 Ubuntu搜狗输入法安装指南 下载地址&#xff1a;https://shurufa.sogou.com/ 计算为amd64的选择x86_64&#xff0c;以下教程来源…

2023Java商城毕业设计(附源码和数据库文件下载链接)Spring Boot + mysql + maven + mybatis-plus

2023Java商城毕业设计Spring Boot mysql maven mybatis-plus 用户注册用户登录修改密码商品列表&#xff08;分类模糊查询&#xff09;个人信息用户信息修改订单信息添加至购物车商品列表商铺详情商品详情商铺列表 资源目录如下&#xff1a;&#xff08;源码sql文件&#xf…

Linux入门2(常用命令)

Linux入门2 Linux常用命令快捷键基础命令文件查看命令文件编辑命令进程管理命令用户管理命令 Linux常用命令 快捷键 Ctrl Alt T打开终端 Ctrl shift 加号 终端字体放大 ctrl 减号 终端字体缩小 基础命令 sudo su 进入管理员目录 exit 返回到用户目录 ls 当前目录下的文…

Illustrator如何使用基础功能?

文章目录 0.引言1.菜单栏2.工具箱 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对Illustrator进行了学习&#xff0c;本文通过《Illustrator CC2018基础与实战》及其配套素材结合网上相关资料进行学习笔记总结&#xff0c;本文对软件界面基本功能进行阐述。    1…

K8s 安全是云安全的未来

导语 到 2025 年&#xff0c;保护 Kubernetes (K8s) 将被认为是云安全最重要的方面。 在最成功的组织中&#xff0c;CTO 和 CISO 已经意识到 Kubernetes 安全的重要性。 但是&#xff0c;虽然 Kubernetes 已经占 CTO 云支出的很大一部分&#xff0c;但 CISO 仍然有所落后。 大…

Android Studio开发图书管理系统APP

Android Studio开发项目图书管理系统项目视频展示&#xff1a; 点击进入图书管理系统项目视频 引 言 现在是一个信息高度发达的时代&#xff0c;伴随着科技的进步&#xff0c;文化的汲取&#xff0c;人们对于图书信息的了解与掌握也达到了一定的高度。尤其是学生对于知识的渴…

asp.net基于web的学生选课成绩管理系统86程序

系统使用Visual studio.net2010作为系统开发环境&#xff0c;并采用ASP.NET技术&#xff0c;使用C#语言&#xff0c;以SQL Server为后台数据库。 本系统主要包含了“登录模块”、“系统用户管理模块”、“课程信息管理模块”、“教师信息管理模块”、“班级信息管理模块”、“…

Lattics ——一款简单易用、好看强大的知识管理工具

如何选择一款适合自己的知识管理工具&#xff1f; 对于很多用户而言&#xff0c;在追求效率的路上&#xff0c;经常需要一款适合自己的知识管理工具。然而&#xff0c;随着工具市场的发展&#xff0c;各种新兴工具层出不穷。在传统领域&#xff0c;有印象笔记、Onenote 为代表…

【笔试强训选择题】Day7.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01; 文章目录…

如何充分利用实时聊天系统?

随着商业和电子商务领域经历快速的数字革命&#xff0c;必须迅速适应的一个因素是我们与客户的互动方式。几年前&#xff0c;电子邮件和电话还是主要的客户联系方式。如今&#xff0c;客户期望更好的服务和更即时的沟通。实时聊天支持系统可以解决此问题&#xff0c;如SaleSmar…

IntelliNode:Node.js大模型访问统一接口库【Gen AI】

使用最新的 AI 模型更新你的应用程序可能具有挑战性&#xff0c;因为它涉及了解不同 AI 模型的复杂性并管理许多依赖项。 IntelliNode 是一个开源库&#xff0c;旨在通过提供统一且易于使用的界面来解决集成 AI 模型的挑战。 这使开发人员能够快速构建 AI 原型并使用高级 AI 功…

CompletableFuture

线程基础知识复习 大神&#xff1a;Doug Lea java.util.concurrent java.util.concurrent.aomic Java.util.concurrent.locks 硬件 摩尔定律&#xff1a; 它是由英特尔创始人之一 Gordon Moore(戈登摩尔)提出来的。其内容&#xff1a; 当价格不变是&#xff0c;集成电路…