手写超级好用的rabbitmq-spring-boot-start启动器

手写超级好用的rabbitmq-spring-boot-start启动器

文章目录

  • 1.前言
  • 2.工程目录结构
  • 3.主要实现原理
    • 3.1spring.factories配置
    • 3.2EnableZlfRabbitMq配置
    • 3.3RabbitAutoConfiguration配置
    • 3.4ZlfRabbitMqRegistrar配置
  • 4.总结

1.前言

  由于springBoot官方提供的默认的rabbitMq自动装配不是那么好用,一个项目中只能配置使用一个rabbitMq的服务器,队列也需要编码的方式定义,这种繁杂且不易使用,用一次需要写一次硬编码,之前有一个想法是,能不能使用springBoot官方提供的自动装配实现一个多rabbitMq多队列配置并且支持多种延迟队列的这种多对多关系的实现,但是左思右想,springBoot官方提供的这个rabbitMq自动装配不能满足我的需求,所以我在酝酿了很久,也把官方那个自动装配的源码看了一遍又一遍,随着我之前手写实现了好几个starter启动器,然后就想实现一个rabbitMq的starter,只要简单的配置即可轻松的实现上面的功能,然后提供了一套好用的api,使用的时候只需要在项目中引入这个启动器,节省很大的硬编码和配置灵活,配置改变只需要重启项目即可,对业务使用友好的一个starter启动器,再也不用为如何使用rabbitMq的集成而烦恼了,只需要简单的配置就可以实现好用的功能,让我们把精力放在业务上,而不是代码和代码集成上,大大的提升开发效率和节省我们宝贵的时间,让我们用宝贵的时间来享受时光,生命和生活,效率至上,远离加班,简约也简单,优雅也优美,简单配置就可以实现交换机、队列、绑定关系等根据配置自动装配,然后实现发送普通消息和3种延迟队列发送延迟消息,3中延迟队列实现如下:

  一:延迟插件实现延迟队列

    交换机类型必须CustomExchange

  二:TTL + 死信队列/延迟交换机实现延迟队列

  三: 延迟交换机 + 消息设置setHeader(“x-delay”, xxx)

  还可以配置相同的rabbitMq服务器不同的虚拟机,单独配置,遵循下标递增不重复即可

实现思路如下。

2.工程目录结构

image-20240312212331654

3.主要实现原理

3.1spring.factories配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.zlf.config.RabbitConfig,\
  com.zlf.config.ExchangeQueueConfig,\
  com.zlf.starter.RabbitAutoConfiguration

3.2EnableZlfRabbitMq配置

package com.zlf.starter;

import org.springframework.context.annotation.Import;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 使用需要在启动类上加入@EnableZlfRabbit注解
 * 启动类上排除默认的自动装配RabbitAutoConfiguration
 *
 * @author zlf
 * 启动类上加入如下配置
 * @SpringBootApplication(exclude = {
 * RabbitAutoConfiguration.class})
 * @Import(value = {RabbitService.class, ZlfMqSpringUtils.class})
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ZlfRabbitMqRegistrar.class)
public @interface EnableZlfRabbitMq {

}

3.3RabbitAutoConfiguration配置

package com.zlf.starter;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public class RabbitAutoConfiguration {

}

3.4ZlfRabbitMqRegistrar配置

package com.zlf.starter;

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.zlf.config.ExchangeQueueConfig;
import com.zlf.config.ExchangeQueueProperties;
import com.zlf.config.RabbitConfig;
import com.zlf.config.RabbitProperties;
import com.zlf.config.RabbitProperties.AmqpContainer;
import com.zlf.config.RabbitProperties.Cache;
import com.zlf.config.RabbitProperties.Cache.Connection;
import com.zlf.config.RabbitProperties.ContainerType;
import com.zlf.config.RabbitProperties.DirectContainer;
import com.zlf.config.RabbitProperties.ListenerRetry;
import com.zlf.config.RabbitProperties.Retry;
import com.zlf.config.RabbitProperties.SimpleContainer;
import com.zlf.config.RabbitProperties.Template;
import com.zlf.constants.ErrorExchangeQueueInfo;
import com.zlf.dto.ExchangeQueueDto;
import com.zlf.enums.DelayTypeEnum;
import com.zlf.enums.ExchangeTypeEnum;
import com.zlf.enums.FunctionTypeEnum;
import com.zlf.service.RabbitService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.CollectionUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * spring:
 * rabbitmq:
 * listener:
 * simple:
 * acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
 * manual:手动ack,需要在业务代码结束后,调用api发送ack。
 * auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
 * none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
 * <p>
 * 原文链接:https://blog.csdn.net/m0_53142956/article/details/127792054
 *
 * @author zlf
 */
@Slf4j
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties(RabbitConfig.class)
public class ZlfRabbitMqRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware, BeanFactoryAware {

    private BeanFactory beanFactory;
    private RabbitConfig rabbitConfig;
    private ExchangeQueueConfig exchangeQueueConfig;

    @SneakyThrows
    @Override
    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
        List<RabbitProperties> rps = rabbitConfig.getRps();
        if (CollectionUtils.isEmpty(rps)) {
            throw new RuntimeException("rabbitMq的rps配置不为空,请检查配置!");
        }
        log.info("zlf.registerBeanDefinitions:rps.size:{},rps:{}", rps.size(), JSON.toJSONString(rps));
        List<ExchangeQueueProperties> eqps = exchangeQueueConfig.getEqps();
        if (CollectionUtils.isEmpty(eqps)) {
            throw new RuntimeException("rabbitMq的eqps配置不为空,请检查配置!");
        }
        log.info("zlf.registerBeanDefinitions:eqps.size:{},rps:{}", eqps.size(), JSON.toJSONString(eqps));
        for (int i = 0; i < rps.size(); i++) {
            this.checkRabbitProperties(rps.get(i));
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactoryBean(rps.get(i)).getObject());
            cachingConnectionFactory.setAddresses(rps.get(i).determineAddresses());
            cachingConnectionFactory.setPublisherReturns(rps.get(i).getPublisherReturns());
            cachingConnectionFactory.setPublisherConfirmType(rps.get(i).getPublisherConfirmType());
            Cache.Channel channel = rps.get(i).getCache().getChannel();
            if (Objects.nonNull(channel.getSize())) {
                cachingConnectionFactory.setChannelCacheSize(channel.getSize());
            }
            if (Objects.nonNull(channel.getCheckoutTimeout())) {
                Duration checkoutTimeout = channel.getCheckoutTimeout();
                cachingConnectionFactory.setChannelCheckoutTimeout(checkoutTimeout.toMillis());
            }
            Connection connection = rps.get(i).getCache().getConnection();
            if (Objects.nonNull(connection.getMode())) {
                cachingConnectionFactory.setCacheMode(connection.getMode());
            }
            if (Objects.nonNull(connection.getSize())) {
                cachingConnectionFactory.setConnectionCacheSize(connection.getSize());
            }
            // 注册cachingConnectionFactory的bean
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(CachingConnectionFactory.class.getName() + i, cachingConnectionFactory);
            log.info("zlf.ConfigurableBeanFactory注册完成,beanName:{}", CachingConnectionFactory.class.getName() + i);
            // 注册RabbitAdmin的bean
            RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(RabbitAdmin.class.getName() + i, rabbitAdmin);
            log.info("zlf.RabbitAdmin注册完成,beanName:{}", RabbitAdmin.class.getName() + i);
            //构建发送的RabbitTemplate实例关联连接工厂
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
            Template template = rps.get(i).getTemplate();
            ConfirmType publisherConfirmType = rps.get(i).getPublisherConfirmType();
            log.info("第{}个配置的publisherConfirmType:{}", i, JSON.toJSONString(publisherConfirmType));
            //生产者confirm
            /**
             * publish-confirm-type:开启publisher-confirm,这里支持两种类型:
             * simple:【同步】等待confirm结果,直到超时(可能引起代码阻塞)
             * correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
             * publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
             * template.mandatory:
             * 定义当消息从交换机路由到队列失败时的策略。
             * 【true,则调用ReturnCallback;false:则直接丢弃消息】
             */
            if (ConfirmType.CORRELATED.equals(publisherConfirmType)) {
                rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                    if (Objects.nonNull(correlationData)) {
                        if (Objects.nonNull(ack) && ack) {
                            log.info("消息发送成功->correlationData:{}", JSON.toJSONString(correlationData));
                        } else if (StringUtils.isNotBlank(cause)) {
                            log.error("消息->correlationData:{}->发送失败原因->{}", JSON.toJSONString(correlationData), cause);
                        }
                    }
                    if (Objects.nonNull(ack) && ack) {
                        log.info("消息发送成功ack:{}", ack);
                    }
                    if (StringUtils.isNotBlank(cause)) {
                        log.error("消息发送失败原因->cause:{}", cause);
                    }
                    if (Objects.isNull(correlationData) && Objects.isNull(ack) && StringUtils.isEmpty(cause)) {
                        log.info("消息发送成功,收到correlationData,ack,cause都是null");
                    }
                });
            }
            Boolean publisherReturns = rps.get(i).getPublisherReturns();
            Boolean mandatory = template.getMandatory();
            log.info("第{}个配置的publisherReturns:{},mandatory:{}", i, publisherReturns, mandatory);
            //消息回调
            //开启强制回调
            if (mandatory && publisherReturns) {
                rabbitTemplate.setMandatory(template.getMandatory());
                rabbitTemplate.setMandatoryExpression(new ValueExpression<>(template.getMandatory()));
                //设置消息回调
                rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                    log.error("消息->{}路由失败", message);
                    // 如果有需要的话,重发消息
                });
            }
            Retry retry = rps.get(i).getTemplate().getRetry();
            if (retry.isEnabled()) {
                RetryTemplate retryTemplate = new RetryTemplate();
                SimpleRetryPolicy policy = new SimpleRetryPolicy();
                retryTemplate.setRetryPolicy(policy);
                policy.setMaxAttempts(retry.getMaxAttempts());
                ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
                backOffPolicy.setMultiplier(retry.getMultiplier());
                if (Objects.nonNull(retry.getMaxInterval())) {
                    backOffPolicy.setMaxInterval(retry.getMaxInterval().toMillis());
                }
                rabbitTemplate.setRetryTemplate(retryTemplate);
            }
            Duration receiveTimeout = template.getReceiveTimeout();
            if (Objects.nonNull(receiveTimeout)) {
                rabbitTemplate.setReceiveTimeout(receiveTimeout.toMillis());
            }
            Duration replyTimeout = template.getReplyTimeout();
            if (Objects.nonNull(replyTimeout)) {
                rabbitTemplate.setReplyTimeout(replyTimeout.toMillis());
            }
            String exchange = template.getExchange();
            if (StringUtils.isNotBlank(exchange)) {
                rabbitTemplate.setExchange(exchange);
            }
            String routingKey = template.getRoutingKey();
            if (StringUtils.isNotBlank(routingKey)) {
                rabbitTemplate.setRoutingKey(routingKey);
            }
            String defaultReceiveQueue = template.getDefaultReceiveQueue();
            if (StringUtils.isNotBlank(defaultReceiveQueue)) {
                rabbitTemplate.setDefaultReceiveQueue(defaultReceiveQueue);
            }
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(RabbitTemplate.class.getName() + i, rabbitTemplate);
            log.info("zlf.RabbitTemplate注册完成,beanName:{}", RabbitTemplate.class.getName() + i);
            // 不注册RabbitService
            RabbitService rabbitService = new RabbitService();
            //构建监听工厂实例并注入
            ContainerType type = rps.get(i).getListener().getType();
            if (ContainerType.SIMPLE.equals(type)) {
                Map<String, String> errorExchangeQueueRelationship = this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);
                SimpleContainer simple = rps.get(i).getListener().getSimple();
                ConstructorArgumentValues cas3 = new ConstructorArgumentValues();
                MutablePropertyValues values3 = new MutablePropertyValues();
                this.getAmqpContainer(simple, values3, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);
                if (Objects.nonNull(simple.getConcurrency())) {
                    values3.add("concurrentConsumers", simple.getConcurrency());
                }
                if (Objects.nonNull(simple.getMaxConcurrency())) {
                    values3.add("maxConcurrentConsumers", simple.getMaxConcurrency());
                }
                if (Objects.nonNull(simple.getBatchSize())) {
                    values3.add("batchSize", simple.getBatchSize());
                }
                RootBeanDefinition rootBeanDefinition3 = new RootBeanDefinition(SimpleRabbitListenerContainerFactory.class, cas3, values3);
                beanDefinitionRegistry.registerBeanDefinition(SimpleRabbitListenerContainerFactory.class.getName() + i, rootBeanDefinition3);
                log.info("zlf.SimpleRabbitListenerContainerFactory注册完成,beanName:{}", SimpleRabbitListenerContainerFactory.class.getName() + i);
            } else if (ContainerType.DIRECT.equals(type)) {
                Map<String, String> errorExchangeQueueRelationship = this.createErrorExchangeQueueRelationship(String.valueOf(i), rabbitService, rabbitAdmin);
                DirectContainer direct = rps.get(i).getListener().getDirect();
                ConstructorArgumentValues cas4 = new ConstructorArgumentValues();
                MutablePropertyValues values4 = new MutablePropertyValues();
                this.getAmqpContainer(direct, values4, cachingConnectionFactory, jackson2JsonMessageConverter, rabbitTemplate, errorExchangeQueueRelationship);
                if (Objects.nonNull(direct.getConsumersPerQueue())) {
                    values4.add("consumersPerQueue", direct.getConsumersPerQueue());
                }
                RootBeanDefinition rootBeanDefinition4 = new RootBeanDefinition(DirectRabbitListenerContainerFactory.class, cas4, values4);
                beanDefinitionRegistry.registerBeanDefinition(DirectRabbitListenerContainerFactory.class.getName() + i, rootBeanDefinition4);
                log.info("zlf.DirectRabbitListenerContainerFactory注册完成,beanName:{}", DirectRabbitListenerContainerFactory.class.getName() + i);
            }
            //解析注册交换机、队列和绑定关系
            ExchangeQueueProperties exchangeQueueProperties = eqps.get(i);
            log.info("zlf.exchangeQueueProperties:{}", JSON.toJSONString(exchangeQueueProperties));
            Integer index = exchangeQueueProperties.getIndex();
            log.info("zlf.exchangeQueueProperties.index:{}", index);
            if (Objects.isNull(index)) {
                throw new RuntimeException("exchangeQueueProperties.index不为空");
            }
            if (Objects.nonNull(exchangeQueueProperties)) {
                log.info("zlf.exchangeQueueProperties:{}", JSON.toJSONString(exchangeQueueProperties));
                List<ExchangeQueueDto> eqs = exchangeQueueProperties.getEqs();
                if (CollectionUtil.isNotEmpty(eqs)) {
                    for (int k = 0; k < eqs.size(); k++) {
                        String bindingIndex = index.toString() + k;
                        log.info("zlf.bindingIndex:{}", bindingIndex);
                        ExchangeQueueDto exchangeQueueDto = eqs.get(k);
                        log.info("zlf.exchangeQueueDto:{}", JSON.toJSONString(exchangeQueueDto));
                        String functionType = exchangeQueueDto.getFunctionType();
                        log.info("zlf.functionType:{}", functionType);
                        if (FunctionTypeEnum.NORMAL.getFunctionType().equals(functionType)) {
                            this.createRelationship(FunctionTypeEnum.NORMAL, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);
                        } else if (FunctionTypeEnum.DELAY.getFunctionType().equals(functionType)) {
                            Integer delayType = exchangeQueueDto.getDelayType();
                            log.info("zlf.delayType:{}", delayType);
                            if (DelayTypeEnum.ONE.getDelayType().equals(delayType)) {
                                //延迟插件实现延迟队列
                                String exchangeType = exchangeQueueDto.getExchangeType();
                                if (!ExchangeTypeEnum.CUSTOM.getExchangeType().equals(exchangeType)) {
                                    throw new RuntimeException("延迟插件实现延迟队列交换机类型exchangeType必须定义为:custom");
                                }
                                this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);
                            } else if (DelayTypeEnum.TWO.getDelayType().equals(delayType)) {
                                //TTL + 死信队列实现延迟队列
                                this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, false);
                            } else if (DelayTypeEnum.THREE.getDelayType().equals(delayType)) {
                                //延迟交换机 + 消息设置setHeader("x-delay", xxx)
                                this.createRelationship(FunctionTypeEnum.DELAY, exchangeQueueDto, rabbitService, rabbitAdmin, bindingIndex, true);
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * 检查rabbitProperties配置的主要参数
     *
     * @param rabbitProperties
     */
    private void checkRabbitProperties(RabbitProperties rabbitProperties) {
        String virtualHost = rabbitProperties.getVirtualHost();
        if (StringUtils.isEmpty(virtualHost)) {
            throw new RuntimeException("RabbitProperties.virtualHost不为空");
        }
        String addresses = rabbitProperties.getAddresses();
        if (StringUtils.isEmpty(addresses)) {
            throw new RuntimeException("RabbitProperties.addresses不为空");
        }
        Integer port = rabbitProperties.getPort();
        if (Objects.isNull(port)) {
            throw new RuntimeException("RabbitProperties.port不为空");
        }
        String username = rabbitProperties.getUsername();
        if (StringUtils.isEmpty(username)) {
            throw new RuntimeException("RabbitProperties.username不为空");
        }
        String password = rabbitProperties.getPassword();
        if (StringUtils.isEmpty(password)) {
            throw new RuntimeException("RabbitProperties.password不为空");
        }
    }

    /**
     * 创建关系
     *
     * @param functionTypeEnum
     * @param exchangeQueueDto
     * @param rabbitService
     * @param rabbitAdmin
     * @param bindingIndex
     */
    private void createRelationship(FunctionTypeEnum functionTypeEnum, ExchangeQueueDto exchangeQueueDto, RabbitService rabbitService, RabbitAdmin rabbitAdmin, String bindingIndex, Boolean isDelayed) {
        String exchangeName = exchangeQueueDto.getExchangeName();
        String exchangeType = exchangeQueueDto.getExchangeType();
        HashMap<String, Object> exchangeArgs = exchangeQueueDto.getExchangeArgs();
        log.info("zlf" + functionTypeEnum.getFunctionType() + ".exchangeName:{},exchangeType:{},exchangeArgs:{}", exchangeName, exchangeType, JSON.toJSONString(exchangeArgs));
        AbstractExchange exchange1 = rabbitService.createExchange(rabbitAdmin, exchangeName, exchangeType, exchangeArgs, isDelayed);
        exchangeName = exchangeName + bindingIndex;
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(exchangeName, exchange1);
        log.info("zlf." + functionTypeEnum.getFunctionType() + ".Exchange注册完成,beanName:{}", exchangeName);
        String queueName = exchangeQueueDto.getQueueName();
        HashMap<String, Object> queueArgs = exchangeQueueDto.getQueueArgs();
        String routingKey1 = exchangeQueueDto.getRoutingKey();
        log.info("zlf." + functionTypeEnum.getFunctionType() + ".queueName:{},queueArgs:{},routingKey1:{}", queueName, JSON.toJSONString(queueArgs), routingKey1);
        Queue queue = rabbitService.createQueue(rabbitAdmin, queueName, queueArgs);
        queueName = queueName + bindingIndex;
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName, queue);
        log.info("zlf." + functionTypeEnum.getFunctionType() + ".Queue注册完成,beanName:{}", queueName);
        Binding binding = rabbitService.binding(rabbitAdmin, exchange1, queue, routingKey1);
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(Binding.class.getName() + bindingIndex, binding);
        log.info("zlf." + functionTypeEnum.getFunctionType() + ".Binding注册完成bindingIndex:{},beanName:{}", bindingIndex, Binding.class.getName() + bindingIndex);
        Integer delayType = exchangeQueueDto.getDelayType();
        if (DelayTypeEnum.TWO.getDelayType().equals(delayType)) {
            String dlxExchangeName = exchangeQueueDto.getDlxExchangeName();
            if (StringUtils.isEmpty(dlxExchangeName)) {
                throw new RuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeName不为空!");
            }
            String dlxExchangeType = exchangeQueueDto.getDlxExchangeType();
            if (StringUtils.isEmpty(dlxExchangeType)) {
                throw new RuntimeException("TTL + 死信队列实现延迟队列配置参数dlxExchangeType不为空!");
            }
            AbstractExchange exchange2 = rabbitService.createExchange(rabbitAdmin, dlxExchangeName, dlxExchangeType, exchangeArgs, false);
            dlxExchangeName = dlxExchangeName + bindingIndex;
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxExchangeName, exchange2);
            log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机注册完成,beanName:{}", dlxExchangeName);
            String dlxQueueName = exchangeQueueDto.getDlxQueueName();
            Queue queue2 = rabbitService.createQueue(rabbitAdmin, dlxQueueName, null);
            dlxQueueName = dlxQueueName + bindingIndex;
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxQueueName, queue2);
            log.info("zlf.TTL + 死信队列实现延迟队列,死信队列注册完成,beanName:{}", dlxQueueName);
            String dlxKey = exchangeQueueDto.getDlxKey();
            Binding binding2 = rabbitService.binding(rabbitAdmin, exchange2, queue2, dlxKey);
            String dlxBeanName = "dlx" + Binding.class.getName() + bindingIndex + 1;
            ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(dlxBeanName, binding2);
            log.info("zlf.TTL + 死信队列实现延迟队列,死信交换机绑定队列的绑定关系注册完成,beanName:{}", dlxBeanName);
        }
    }

    private Map<String, String> createErrorExchangeQueueRelationship(String index, RabbitService rabbitService, RabbitAdmin rabbitAdmin) {
        Map<String, String> resultMap = new HashMap<>();
        String exchangeName = ErrorExchangeQueueInfo.ERROR_EXCHANGE_PREFIX + index;
        AbstractExchange exchange = rabbitService.createExchange(rabbitAdmin, exchangeName, ErrorExchangeQueueInfo.ERROR_EXCHANGE_TYPE, null, false);
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(exchangeName, exchange);
        log.info("zlf.ErrorExchange注册完成,beanName:{}", exchangeName);
        String queueName = ErrorExchangeQueueInfo.ERROR_QUEUE_PREFIX + index;
        Queue queue = rabbitService.createQueue(rabbitAdmin, queueName, null);
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName, queue);
        log.info("zlf.ErrorQueue注册完成,beanName:{}", queueName);
        String errorRoutingKey = ErrorExchangeQueueInfo.ERROR_KEY_PREFIX + index;
        log.info("zlf.errorRoutingKey:{}", errorRoutingKey);
        Binding errorBinding = rabbitService.binding(rabbitAdmin, exchange, queue, errorRoutingKey);
        String errorBingBeanName = ErrorExchangeQueueInfo.ERROR_BINDING_BANE_NAME_PREFIX + Binding.class.getSimpleName() + index;
        ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(errorBingBeanName, errorBinding);
        log.info("zlf.ErrorBing注册完成,beanName:{}", errorBingBeanName);
        resultMap.put("errorExchange", exchangeName);
        resultMap.put("errorRoutingKey", errorRoutingKey);
        return resultMap;
    }

    private void getAmqpContainer(AmqpContainer amqpContainer, MutablePropertyValues values, CachingConnectionFactory cachingConnectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter, RabbitTemplate rabbitTemplate, Map<String, String> errorExchangeQueueRelationship) {
        values.add("connectionFactory", cachingConnectionFactory);
        values.add("autoStartup", amqpContainer.isAutoStartup());
        values.add("messageConverter", jackson2JsonMessageConverter);
        if (Objects.nonNull(amqpContainer.getAcknowledgeMode())) {
            values.add("acknowledgeMode", amqpContainer.getAcknowledgeMode());
        }
        if (Objects.nonNull(amqpContainer.getPrefetch())) {
            values.add("prefetch", amqpContainer.getPrefetch());
        }
        if (Objects.nonNull(amqpContainer.getDefaultRequeueRejected())) {
            values.add("defaultRequeueRejected", amqpContainer.getDefaultRequeueRejected());
        }
        if (Objects.nonNull(amqpContainer.getIdleEventInterval())) {
            values.add("idleEventInterval", amqpContainer.getIdleEventInterval());
        }
        values.add("missingQueuesFatal", amqpContainer.isMissingQueuesFatal());
        ListenerRetry retry2 = amqpContainer.getRetry();
        if (retry2.isEnabled()) {
            RetryInterceptorBuilder<?, ?> builder = (retry2.isStateless()) ? RetryInterceptorBuilder.stateless()
                    : RetryInterceptorBuilder.stateful();
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy policy = new SimpleRetryPolicy();
            retryTemplate.setRetryPolicy(policy);
            policy.setMaxAttempts(retry2.getMaxAttempts());
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setMultiplier(retry2.getMultiplier());
            if (Objects.nonNull(retry2.getMaxInterval())) {
                backOffPolicy.setMaxInterval(retry2.getMaxInterval().toMillis());
            }
            builder.retryOperations(retryTemplate);
            /**
             * 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
             *
             * RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
             * ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
             * RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
             */
            //消息接受拒绝后发送到异常队列
            String errorExchange = errorExchangeQueueRelationship.get("errorExchange");
            String errorRoutingKey = errorExchangeQueueRelationship.get("errorRoutingKey");
            MessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, errorExchange, errorRoutingKey);
            log.info("zlf.MessageRecoverer.errorExchange:{},errorRoutingKey:{}", errorExchange, errorRoutingKey);
            builder.recoverer(recoverer);
            values.add("adviceChain", builder.build());
        }
    }

    private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties)
            throws Exception {
        PropertyMapper map = PropertyMapper.get();
        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
        map.from(properties::determineHost).whenNonNull().to(factory::setHost);
        map.from(properties::determinePort).to(factory::setPort);
        map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
        map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
        map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
        map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds)
                .to(factory::setRequestedHeartbeat);
        map.from(properties::getRequestedChannelMax).to(factory::setRequestedChannelMax);
        RabbitProperties.Ssl ssl = properties.getSsl();
        if (ssl.determineEnabled()) {
            factory.setUseSSL(true);
            map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
            map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
            map.from(ssl::getKeyStore).to(factory::setKeyStore);
            map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
            map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
            map.from(ssl::getTrustStore).to(factory::setTrustStore);
            map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
            map.from(ssl::isValidateServerCertificate)
                    .to((validate) -> factory.setSkipServerCertificateValidation(!validate));
            map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);
        }
        map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis)
                .to(factory::setConnectionTimeout);
        factory.afterPropertiesSet();
        return factory;
    }

    @Override
    public void setEnvironment(Environment environment) {
        // 通过Binder将environment中的值转成对象
        rabbitConfig = Binder.get(environment).bind(getPropertiesPrefix(RabbitConfig.class), RabbitConfig.class).get();
        exchangeQueueConfig = Binder.get(environment).bind(getPropertiesPrefix(ExchangeQueueConfig.class), ExchangeQueueConfig.class).get();
    }

    private String getPropertiesPrefix(Class<?> tClass) {
        return Objects.requireNonNull(AnnotationUtils.getAnnotation(tClass, ConfigurationProperties.class)).prefix();
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

}

4.总结

  到此,手写rabbitMq的starter实现思路就已经全部分享完了,思路比代码更重要,代码只是一个参考,用这个思路实现更多更方便简单快捷高效的轮子,制造轮子也是一种提升和给你带来成就感的事情,累并快乐着,后面我会将我之前手写的starter全部开源出来,然后将gitHub或码云地址分享给大家,制造轮子,开源给大家使用,这本身就是一种开源的精神和乐趣,Java生态最缺的就不是轮子是好用的轮子,请在看我的文章或者是转发,请把本文的原出处和作者写上去,尊重版权,创作不易,禁止原模原样的搬过去就是自己的原创,这种是不道德的行为,见到请如实举报,或者联系本作者来举报,这个starter,说实话也是构思酝酿了好久,猛干了2-3天才干出来,颈椎都干的酸,我得休息加强锻炼身体了,说实话写这个starter还是挺累的,但是搞出来的成就感满满,也方便以后集成快速使用,配置多个的rabbitMq服务器也测试过了的,也是ok的,但是配置一个rabbitMq和多个交换机、队列和绑定关系以及3种延迟队列实现是亲自测试OK的,希望我的分享能给你帮助和启发,请一键三连,么么么哒!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/454655.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

insertAdjacentHTML() 作用

insertAdjacentHTML()简介 insertAdjacentHTML() 方法是将文本解析为 element 元素&#xff0c;并将结果节点插入到DOM树中的指定位置。它不会重新解析它正在使用的元素&#xff0c;因此它不会破坏元素内的现有元素。这避免了额外的序列化步骤&#xff0c;使其比直接使用innerH…

机试:数塔路径

问题描述: 代码示例: //数塔路径 #include <bits/stdc.h>using namespace std;int main(){ // 算法思想: // 逆推,将最下方和右下方的数字进行比较,哪个大则加上并更新,直至到根节点即为最大 int n;cin >> n; int nums[n1][n1]; // 输入数塔 for(int i 1;i < n…

搭建Hadoop3.x完全分布式集群

零、资源准备 虚拟机相关&#xff1a; VMware workstation 16&#xff1a;虚拟机 > vmware_177981.zipCentOS Stream 9&#xff1a;虚拟机 > CentOS-Stream-9-latest-x86_64-dvd1.iso Hadoop相关 jdk1.8&#xff1a;JDK > jdk-8u261-linux-x64.tar.gzHadoop 3.3.6&am…

逆变器功率软起斜率要求

安规说明 在NB32004中&#xff0c;有明确要求&#xff0c;有功功率调整速率不得超过正负10%Pn/min&#xff0c;包括起停机。 控制对象 控制功率最终是通过调整D轴电流给定来达到限制功率的目的&#xff0c;所以我们只要让D轴的电流给定限幅值按照10%/min增加就好了。 具体实…

Selenium Web自动化测试——基于unittest框架的PO设计模式

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

一文讲懂 C++ 类和对象(1)

0. 面向过程程序设计和面向对象程序设计的区别 面向对象程序设计往往关注的是怎么去做&#xff0c;是将解决问题的步骤分析出来&#xff0c;然后用函数把步骤一步一步实现&#xff0c;然后再依次调用就可以了。而面向对象是将构成问题的事物&#xff0c;分解成若干个对象&…

比特币创造历史新纪录

综合来源&#xff1a;coindesk and cointelegraph 编译&#xff1a;秦晋 3月11日&#xff0c;比特币在亚洲交易时段首次突破71,000美元&#xff0c;这个是比特币创造的价格新纪录。自1月11日比特币现货ETF在美国获批以来&#xff0c;比特币一直在稳步上涨。以太币突破4000美元。…

rust学习(手动写一个线程池)

哈哈&#xff0c;主要是为了练习一下rust的语法&#xff0c;不喜勿喷。 一.Executor申明 struct AExecutor<T> {results:Arc<Mutex<HashMap<u32,T>>>, //1functions:Arc<Mutex<Vec<ATask<T>>>> //2 } 1.results&#xff1a…

产品测试方案:视频接入平台并发性能测试方案和报告(即150路视频并发流媒体服务器模块的性能测试方案和报告)

目 录 一、测试目的&#xff1a; 二、测试方案&#xff1a; 2.1、测试思路 2.2、拓扑图 三、测试环境 3.1 服务器配置 3.2 网络摄像机列表 3.3 测试软件 四、测试流程 4.1 H.264并发测试&#xff1a; 4.1.1老版本srsout3.10并发测试 4.1.2 新版本srsout…

2024年信息技术与计算机工程国际学术会议(ICITCEI 2024)

2024年信息技术与计算机工程国际学术会议&#xff08;ICITCEI 2024&#xff09; 2024 International Conference on Information Technology and Computer Engineering ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 大会主题&#xff1a; 信息系统和技术…

如何选择腾讯云轻量应用服务器地域?北京上海广州哪个合适?

腾讯云轻量应用服务器地域如何选择&#xff1f;地域就近选择&#xff0c;北方选北京地域、南方选广州地域&#xff0c;华东地区选上海地域。广州上海北京地域有什么区别&#xff1f;哪个好&#xff1f;区别就是城市地理位置不同&#xff0c;其他的差不多&#xff0c;不区分好坏…

基于java+springboot+vue实现的停车场管理系统(文末源码+Lw)23-258

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统停车场管理系统信息管理难度大&#xff0c;容错率低&…

python实现卡普均值最小回路算法

如果给定一个含有环的有向图&#xff0c;要在这个图中找出所有的环并计算这些环的路径长度&#xff0c;然后除以环的边数&#xff0c;所得到的结果也就是环的平均值&#xff0c;这里也就是如何计算这个环的最小均值问题。 首先可以确定的是&#xff0c;如果图中均值最小的环的…

漏洞发现-漏扫项目篇武装BURP浏览器插件信息收集分析辅助

知识点 1、插件类-武装BurpSuite-漏洞检测&分析辅助 2、插件类-武装谷歌浏览器-信息收集&情报辅助 章节点&#xff1a; 漏洞发现-Web&框架组件&中间件&APP&小程序&系统 扫描项目-综合漏扫&特征漏扫&被动漏扫&联动漏扫 Poc开发-Ymal语…

深入浅出落地应用分析:AI虚拟数字人

据艾媒咨询,2025年中国虚拟人市场规模预计达480.6亿元,用户群体主要为中型及小微型企业,产品需求量TOP5分别是电商、卫生、社会保障和社会福利业、教育、金融和运输业,主要产品类型为数字员工及定制化数字人。 一、什么是数字人 1.1 概念介绍 数字人是指以数字形式存在于…

2024年腾讯云轻量服务器地域选择方法_新手地域教程

腾讯云轻量应用服务器地域如何选择&#xff1f;地域就近选择&#xff0c;北方选北京地域、南方选广州地域&#xff0c;华东地区选上海地域。广州上海北京地域有什么区别&#xff1f;哪个好&#xff1f;区别就是城市地理位置不同&#xff0c;其他的差不多&#xff0c;不区分好坏…

代码贴--链表--数据机构

本博客将记录链表代码(单链表)&#xff0c;后续其他链表和其他数据结构内容请看我的其他博客 头文件(SList.h) #pragma once #include<iostream> #include<bits/stdc.h> using namespace std;typedef int SLTDataType;struct SListNode {int data;struct SListNo…

每日OJ题_牛客HJ37 统计每个月兔子的总数(IO型OJ)

目录 牛客HJ37 统计每个月兔子的总数 解析代码 牛客HJ37 统计每个月兔子的总数 统计每个月兔子的总数_牛客题霸_牛客网 解析代码 #include <iostream> #include <vector>using namespace std; int main() {int n 0;cin >> n;vector<int> arr(n 1…

VRay渲染动画怎么快一点?提升VRay动画渲染方法

随着动画和视觉效果行业对高品质渲染的需求日益增长&#xff0c;V-Ray作为一款领先的渲染工具&#xff0c;面临着提升渲染效率的挑战。项目规模和复杂度的扩大导致渲染时间延长&#xff0c;对交付期限造成影响。探索加速V-Ray渲染流程的方法变得尤为关键。 一、动画渲染的常见瓶…

docker-compose up -d使用遇到问题no configuration file provided: not found

docker-compose up -d使用遇到问题&#xff0c;因为你文件名称没指定&#xff0c; 又找不到默认的文件名称&#xff1b;如果该目录下有个文件叫docker-compose.yml时&#xff0c;那么可以直接使用docker-compose up -d;否则就要使用docker-compose -f mysql up -d
最新文章