支付系统设计:消息重试组件封装

文章目录

  • 前言
  • 一、重试场景分析
  • 一、如何实现重试
    • 1. 扫表
    • 2. 基于中间件自身特性
    • 3. 基于框架
    • 4. 根据公司业务特性自己实现的重试
  • 二、重试组件封装
    • 1. 需求分析
    • 2. 模块设计
      • 2.1 持久化模块
        • 1. 表定义
        • 2. 持久化接口定义
        • 3. 持久化配置类
      • 2.2 重试模块
        • 1.启动
        • 2.重试
    • 3. 业务端使用
      • 1. 引入依赖
      • 2. 新增配置
      • 3. 使用
  • 总结


前言

如何封装一套服务自身业务开箱即用的重试组件?是个值得思考的问题!

在这里插入图片描述

在开发支付系统过程中,我们经常会遇到这样的业务场景:调用下游系统、回调上游系统,由于网络原因或者当时对方系统不可用导致调用失败,那么调用失败就失败了么?当然肯定不是,一般都要有重试机制。这种重试机制实现有很多方式,但是万万不可依赖其他系统的重试机制去重试你要重试调用的系统,这个原因下面分析。本篇文章就重试场景给出一个个人觉得还不错的解决方案,也是作者所在用的解决方案,如有更好的解决方案欢迎交流。


一、重试场景分析

在支付系统中我们经常会将一些非核心业务流程做成异步的,在核心主流程中往MQ写入一条相对应的待处理消息,写入成功即认为业务处理成功了,所以我们要证在消费端最大程度的保证处理成功。
在结果通知中也有失败重试策略,我们对接支付渠道如支付宝:如果不返回指定成功的报文信息其将在25小时以内完成8次通知(通知的间隔频率一般是4m,10m,10m,1h,2h,6h,15)。
这里我们分析个场景,流程很简单,如下:在这里插入图片描述
支付渠道通知我们的支付系统,支付系统通知商户系统,之间为同步调用,渠道调用过来,支付系统变更订单状态,变更后调用商户系统,如果调用商户系统失败了,那么支付系统给渠道返回失败,然后过一段时间后渠道发起重试,再次调用支付系统,支付系统再调用商户系统。借助渠道的通知重试策略来完成自身的重试通知。谁要是这么设计,原地刨个坑活埋了他吧,不要觉得没有人用这种方式,事实就是真的有公司这么用。结果可想而知,不出问题只能说明做的系统没交易量,一旦有交易量,支付系统会被商户系统给拖垮掉,原因自行分析。

本篇文章呢我们以支付结果通知为例作为场景展开分析,做一个面对这种场景的统一解决方案,同时是没有使用充值VIP的RabbitMQ作为消息中间件。

既然没钱充值VIP购买其强大的重试功能,只能自己开发了。

一、如何实现重试

1. 扫表

实现重试的方式有很多种,有基于扫描表的,如下:
在这里插入图片描述
前置通知失败后,即落入重试表,待定时任务触发扫描表重新发起调用,这种处理方案是很多公司在用的。这种方案虽然不会像上面有拖垮系统的风险,但是问题还是很多的,如定时任务多久触发一次?有些交易对实时性要求比较高,如果第一次因为网络原因导致的失败,紧接着重试一般就能成功了,那么就把定时任务设定1s一次的频率?这种方式不再详细分析了…有点设计能力的人都不会采用这种方式吧。

2. 基于中间件自身特性

RocketMQ中间件本身已经支持重试,下文直接截图了:
在这里插入图片描述

3. 基于框架

针对RabbitMQ中间件spring提供的retry:

server:
  port:8080
spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: xxxx
    password: xxx
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 5
          initial-interval: 5000
          max-interval: 10000

4. 根据公司业务特性自己实现的重试

在这里插入图片描述
如上是自己基于“指数退避策略进行延迟重试”封装的一套重试组件,也是本篇要介绍的方案。

二、重试组件封装

1. 需求分析

如何封装一套服务自身业务开箱即用的重试组件?是个值得思考的问题,但是Spring-boot已经给出了答案。我们在使用Springboot开发项目时候想要集成RabbitMQ只需要加入依赖,然后配置yml就可以使用了,一旦满足约定好的条件,Springboot则帮我们激活所需要的Bean,那么我们是不是也可以参考其思想自己也装配重试所需的Bean。

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
       <version>2.4.1</version>
   </dependency>

决定了怎么做,然后分析业务系统特性,自己做的支付系统业务特性是:一个系统会有多个队列的消费者,并且每个队列消息处理失败后的重试次数、间隔时间也各不相同,并且达到最大失败重试次数后要入通知重试表,供后期业务系统恢复后再次发起重试。最终要的是,使用系统只需要简单配置下就可以实现上面需求,就像spring提供的retry机制一样,简单配置下就行了,不需要你知道底层原理。

2. 模块设计

在这里插入图片描述
从我们的架构图中可以看到,其主要分为两个模块,重试模块、持久化模块,我们逐个分析这俩模块的设计实现,首先从简单的开始,持久化模块。

2.1 持久化模块

首先没得说需要建表,需要使用starter提供的自动持久化功能就要创建starter持久化所需要的表:

1. 表定义

/**
 * @author Kkk
 * @Description: 异常通知恢复表
 */
@Entity
@Table(name = "notify_recover")
public class NotifyRecover implements Serializable {
    /**id*/
    @Id
    @Column(name="id",insertable = false)
    private Long id;

    /** 唯一标识键 */
    @Column(name="unique_key")
    private String uniqueKey ;

    /** 场景码 */
    @Column(name="scene_code")
    private String sceneCode ;

    /** 调用方系统 */
    @Column(name="system_id")
    private String systemId;

    /** 通知内容 */
    @Column(name="notify_content")
    private String notifyContent ;

    /** 通知方式:http mq */
    @Column(name="notify_type")
    private int notifyType ;

    /** 交换器*/
    @Column(name="exchange")
    private String exchange ;

    /** 异步通知路由键 */
    @Column(name="notify_key")
    private String notifyKey ;

    /** 通知次数 */
    @Column(name="notify_num")
    private int notifyNum ;

    /** 通知状态 */
    @Column(name="notify_status")
    private String notifyStatus ;

    /** 备注 */
    @Column(name="remark")
    private String remark ;

    /** 扩展字段 */
    @Column(name="extend")
    private String extend ;

    /** 创建时间 */
    @Column(name="create_time",insertable = false)
    private Date createTime ;

    /** 修改时间 */
    @Column(name="update_time",insertable = false)
    private Date updateTime ;

    @Column(name="bucket")
    private String bucket ;
	
	// ... ...
}

2. 持久化接口定义

然后入表接口肯定也是需要的:

/**
 * @author Kkk
 * @Description: 发送失败处理
 */
public interface NotifyRecoverHandler<T> {
    /**
     * 处理重发失败入重试表
     * @param t
     */
    public void handlerSendFail(T t);
}

3. 持久化配置类

创建持久化配置类:

/**
 * @author Kkk
 * @Description: 持久化配置类
 */
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.retry",value = "recover",havingValue = "true",matchIfMissing = false)
public class JdbcHelperMqConfiguration {
    @Bean(name = "jdbcSelectProvider")
    public JdbcSelectProvider jdbcSelectProviderBean() {
        return new JdbcSelectProvider();
    }
    @Bean(name = "jdbcInsertProvider")
    public JdbcInsertProvider jdbcInsertProviderBean() {
        return new JdbcInsertProvider();
    }
    @Bean(name = "jdbcUpdateProvider")
    public JdbcUpdateProvider jdbcUpdateProviderBean() {
        return new JdbcUpdateProvider();
    }

    @Bean(name = "jdbcHelper")
    public JdbcHelper jdbcHelperBean(@Qualifier("jdbcSelectProvider")JdbcSelectProvider jdbcSelectProvider,
                                     @Qualifier("jdbcInsertProvider")JdbcInsertProvider jdbcInsertProvider,
                                     @Qualifier("jdbcUpdateProvider")JdbcUpdateProvider jdbcUpdateProvider) {
        return new JdbcHelperImpl(jdbcSelectProvider,jdbcInsertProvider,jdbcUpdateProvider);
    }

    @Bean(name = "notifyRecoverHandler")
    @ConditionalOnMissingBean(value = NotifyRecoverHandler.class)
    public NotifyRecoverHandler notifyRecoverHandlerBean(@Qualifier("jdbcHelper")JdbcHelper jdbcHelper) {
        return new DefaultNotifyRecoverHandlerImpl(jdbcHelper);
    }
}

此配置类的激活条件时,配置了失败是否需要入重试表配置。同时也可以不使用starter提供的入表策略,如果业务系统有自己的重试表那么就可以将失败的消息入到自定义的表中,此处预留的扩展点。
jdbcSelectProviderjdbcInsertProviderjdbcUpdateProvider这个三个类为查询、新增、更新对应的处理类,为底层的JDBC操作。

/**
 * @author Kkk
 * @Description: select提供类
 */
public class JdbcSelectProvider<T> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcSelectProvider.class);
    @Resource
    private DataSource dataSource;

    public JdbcSelectProvider() {
    }

    public List<T> select(String sql, Class outputClass) {
        return this.selectExecute(sql,outputClass);
    }

    private List<T> selectExecute(String sql, Class outputClass,Object... params) {
        Connection connection = null;
        PreparedStatement pst = null;
        ResultSet res = null;
        List<T> ts =null;
        try {
            connection = DataSourceUtils.getConnection(this.dataSource);
            pst = connection.prepareStatement(sql);

            for(int i = 0; i < params.length; ++i) {
                pst.setObject(i + 1, params[i]);
            }
            res = pst.executeQuery();
            ts = mapRersultSetToObject(res, outputClass);
        } catch (SQLException var7) {
            var7.printStackTrace();
        }finally {
            try {
                connection.close();
                pst.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
        return ts;
    }

    @SuppressWarnings("unchecked")
    public List<T> mapRersultSetToObject(ResultSet rs, Class outputClass) {
        List<T> outputList = null;
        try {
            if (rs != null) {
                if (outputClass.isAnnotationPresent(Entity.class)) {
                    ResultSetMetaData rsmd = rs.getMetaData();
                    Field[] fields = outputClass.getDeclaredFields();
                    while (rs.next()) {
                        T bean = (T) outputClass.newInstance();
                        for (int _iterator = 0; _iterator < rsmd.getColumnCount(); _iterator++) {
                            String columnName = rsmd.getColumnName(_iterator + 1);
                            Object columnValue = rs.getObject(_iterator + 1);
                            for (Field field : fields) {
                                if (field.isAnnotationPresent(Column.class)) {
                                    Column column = field.getAnnotation(Column.class);
                                    if (column.name().equalsIgnoreCase(columnName) && columnValue != null) {
                                        BeanUtils.setProperty(bean, field.getName(), columnValue);
                                        break;
                                    }
                                }
                            }
                        }
                        if (outputList == null) {
                            outputList = new ArrayList<T>();
                        }
                        outputList.add(bean);
                    }

                } else {
                    logger.error("查询结果集映射失败,映射类需要@Entity注解");
                }
            } else {
                return null;
            }
        } catch (Exception e) {
            logger.error("查询结果集映射失败",e);
        }
        return outputList;
    }
}

jdbcHelper对如上几个Provider进行了统一包装处理:

/**
 * @author Kkk
 * @Description:
 */
public class JdbcHelperImpl implements JdbcHelper {
    private Logger logger = LoggerFactory.getLogger(JdbcHelperImpl.class);
    String s="'";
    private JdbcSelectProvider jdbcSelectProvider;

    private JdbcInsertProvider jdbcInsertProvider;

    private JdbcUpdateProvider jdbcUpdateProvider;

    ResultSetMapper<NotifyRecover> resultSetMapper = new ResultSetMapper<NotifyRecover>();

    public JdbcHelperImpl(JdbcSelectProvider jdbcSelectProvider, JdbcInsertProvider jdbcInsertProvider,JdbcUpdateProvider jdbcUpdateProvider) {
        this.jdbcSelectProvider = jdbcSelectProvider;
        this.jdbcInsertProvider = jdbcInsertProvider;
        this.jdbcUpdateProvider = jdbcUpdateProvider;
    }

    public List<NotifyRecover>  selectData(String uniqueKey,String sceneCode){
        StringBuilder stringBuilder = new StringBuilder("SELECT * FROM notify_recover WHERE unique_key='");
        stringBuilder.append(uniqueKey);
        stringBuilder.append(s);
        stringBuilder.append(" AND scene_code='");
        stringBuilder.append(sceneCode);
        stringBuilder.append(s);
        String sql = stringBuilder.toString();
        List<NotifyRecover> pojoList = this.jdbcSelectProvider.select(sql, NotifyRecover.class);
        if(null==pojoList || pojoList.size()==0 ){
            logger.info("根据uniqueKey({}),sceneCode({})查询结果为空!",uniqueKey,sceneCode);
            return null;
        }
        return pojoList;
    }

    @Override
    public void insertData(NotifyRecover notifyRecover) {
        jdbcInsertProvider.insert(notifyRecover);
    }

    @Override
    public int updateData(NotifyRecover notifyRecover) {
        StringBuilder stringBuilder = new StringBuilder("UPDATE notify_recover SET notify_status='");
        stringBuilder.append(notifyRecover.getNotifyStatus());
        stringBuilder.append("', notify_num=");
        stringBuilder.append(notifyRecover.getNotifyNum());
        stringBuilder.append(" WHERE unique_key='");
        stringBuilder.append(notifyRecover.getUniqueKey());
        stringBuilder.append(s);
        stringBuilder.append(" AND scene_code='");
        stringBuilder.append(notifyRecover.getSceneCode());
        stringBuilder.append(s);
        String sql = stringBuilder.toString();
        int resultSet = this.jdbcUpdateProvider.update(sql);
        return resultSet;
    }
}

最后一部分持久化接口默认实现,如果业务方想使用持久化进制,并没有实现持久化接口则采用默认实现:

    @Bean(name = "notifyRecoverHandler")
    @ConditionalOnMissingBean(value = NotifyRecoverHandler.class)
    public NotifyRecoverHandler notifyRecoverHandlerBean(@Qualifier("jdbcHelper")JdbcHelper jdbcHelper) {
        return new DefaultNotifyRecoverHandlerImpl(jdbcHelper);
    }

持久化默认实现:

/**
 * @author Kkk
 * @Description: 持久化默认实现
 */
public class DefaultNotifyRecoverHandlerImpl implements NotifyRecoverHandler<NotifyRecover> {
    private Logger logger = LoggerFactory.getLogger(DefaultNotifyRecoverHandlerImpl.class);

    BasicThreadFactory factory = new BasicThreadFactory.Builder()
            .namingPattern("recover-execute-thread-%d")
            .uncaughtExceptionHandler(new NotifyRecoverThreadUncaughtExceptionHandler()).build();

    private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4,factory);

    private JdbcHelper jdbcHelperImpl;

    public DefaultNotifyRecoverHandlerImpl(JdbcHelper jdbcHelperImpl) {
        this.jdbcHelperImpl = jdbcHelperImpl;
    }

    @Override
    public void handlerSendFail(NotifyRecover notifyRecover) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //采用异步持久化
            }
        });
    }
}

到这里就完成了持久化工作了,但是还有一个很重要的问题,怎么将此类注册为Spring中的Bean呢?方式多种,最简单的是使用@Import标签,在重试的主配置类上引入此配置类。

@Import(JdbcHelperMqConfiguration.class)
public class RabbitMqRetrySendConfigurationMultiply {
}

2.2 重试模块

下面分析重试模块,首先重试模块我们是基于RabbitMQ死信队列来做的,关于死信、死信队列的概念这里不做解释了,
重试最主要分为启动时、运行时两部分。

1.启动

根据配置自动生成死信队列并通过对应的交换器与原队列进行路由绑定,大概流程见很久之前写的一篇博客[商户交易结果通知设计],当时只是针对支付系统通知功能做的,并没有做什么组件化,后期发现实际
项目中很多场景都需要这种重试机制,所以为了避免重复代码的编写,后期就简单的封装了下作为一个延迟重试组件以供在项目中开发作为一个组件直接引入依赖使用就行了。
要做的是如何将原来的代码片段封装到starter并装配到Spring中。

@Configuration
@EnableConfigurationProperties({RabbitMqRetryMultiplyProperties.class, SystenEnvProperties.class})
@ConditionalOnProperty(prefix = "spring.rabbitmq",value = "isRetry",havingValue = "true")
@ConditionalOnClass({ AmqpAdmin.class, RabbitTemplate.class })
@Import(JdbcHelperMqConfiguration.class)
public class RabbitMqRetrySendConfigurationMultiply {
    @Autowired
    private RabbitMqRetryMultiplyProperties rabbitMqRetryMultiplyProperties;
    @Autowired
    private SystenEnvProperties systenEnvProperties;

    @Bean(name = "rabbitMqService")
    public RabbitMqService rabbitMqServiceBean() {
        return new RabbitMqServiceImpl();
    }
    @Bean(initMethod = "start", destroyMethod = "stop")
    public PscCommonRetryQueueManager pscCommonRetryQueueManager(@Qualifier("rabbitMqService")RabbitMqService rabbitMqService,
                                                                 @Autowired(required = false) @Qualifier("notifyRecoverHandler")NotifyRecoverHandler notifyRecoverHandler) {
        return PscCommonRetryQueueManager.builder().configs(rabbitMqRetryMultiplyProperties.getConfigs())
                .retryCountFlag(SystemConstant.RETRY_COUNT_FLAG)
                .rabbitMqService(rabbitMqService)
                .notifyRecoverHandler(notifyRecoverHandler)
                .applicationName(systenEnvProperties.getName())
                .build();
    }
}

即满足如下两个条件即会构建PscCommonRetryQueueManager这个Bean。

@ConditionalOnProperty
@ConditionalOnClass

初始化时候会调用其start方法,在看之前先看下配置类,需要用户配置什么东西。

/**
 * @author Kkk
 * @Description: 重试配置类
 */
@Data
public class ConfigEntity implements Serializable {
    //重试次数
    private Integer retry_count=5;

    //重试队列名
    private String retry_queue_name_prefix;

    //死信消息失效时间计算方式:指数方式 exponential
    private String message_expiration_type="exponential";

    //x-dead-letter-exchange
    private String x_dead_letter_exchange;

    //x-dead-letter-exchange
    private String x_dead_letter_routing_key;

    //延迟时间因子:10s。具体延迟时间计算方式:2^count*10s
    public Integer delay_milliseconds=10000;

    //项目需要消费的队列名称
    public String consumer_queue_name;

    //消息丢失处理策略
    public String notify_recover_handler;
}

接下来看其start方法做了什么,首先看下类继承关系
在这里插入图片描述
在接口中定义方法。

/**
 * @author Kkk
 * @Description: 重试管理接口
 */
public interface RetryQueueManager {
    /**
     * 启动
     */
    void start();

    /**
     * 停止
     */
    void stop();

    /**
     * 发送延迟消息 -可捕获异常入重试表
     */
    boolean sendRetryMessage(Message message);

    /**
     *发送消息 -可捕获异常入重试表
     */
    boolean sendMessage(String exchange, String routingKey, String jsonString,String uniqueKey,String sceneCode);

    /**
     * 发送消息 -可捕获异常入重试表
     */
    boolean sendMessage(String exchange, String routingKey, String jsonString);
    /**
     * 发送延迟消息-发送网络异常可以放入重试表
     */
    boolean sendRetryMessage(Message message,String uniqueKey,String sceneCode);
}

抽象层抽取了写公共参数,具体实现由子类实现。

/**
 * @author Kkk
 * @Description: 抽象层
 */
public abstract class AbstractRetryQueueManager implements RetryQueueManager {
    private Logger logger = LoggerFactory.getLogger(AbstractRetryQueueManager.class);

    // 重试处理
    protected NotifyRecoverHandler notifyRecoverHandler;
    // 消息处理
    protected RabbitMqService rabbitMqService;
    //消息重试次数标识 埋点到消息头中的字段
    public String retryCountFlag;
    //应用名称
    public String applicationName;
    //重试配置相关信息
    public List<RetryQueueConfigs>  retryQueueConfigs;

    @Data
    public static final class RetryQueueConfigs {
        //重试次数
        public Integer retryCount=10;
        //重试队列名
        public String retryQueueNamePrefix;
        //死信消息失效时间计算方式:指数方式 exponential
        public String messageExpirationType="exponential";
        //x-dead-letter-exchange
        public String xDeadLetterExchange="topic";
        //x-dead-letter-routing-key
        public String xDeadLetterRoutingKey;
        //延迟时间因子:10s。具体延迟时间计算方式:2^count*10s
        public Integer delayMilliseconds;
        //项目需要消费的队列名称
        public String consumerQueueName;
    }

    @Override
    public void start() {
        logger.info("开始创建重试队列!");
        createRetryQueue();
        logger.info("创建重试队列完成!");
    }

    /**
     * 应用启动构建重试队列
     */
    protected abstract void createRetryQueue();

    @Override
    public void stop() {

    }
    // ... ...
}

在子类实现抽象层方法createRetryQueue(),生成死信交换器和队列并绑定,接着根据配置生成指定个说的死信队列,默认按照指数类型(延迟时间因子:10s。具体延迟时间计算方式:2^count*10s),然后将这些队列绑定到上面生成的交换器上,由于这些生成的死信队列没有消费者,所以消息过期后会再被路由到原队列中,即可又被正常消费处理,以此来达到延迟的效果,原理比较简单。

@Override
protected void createRetryQueue() {
	for (RetryQueueConfigs config:retryQueueConfigs) {
		TopicExchange topicExchange = ExchangeBuilder.topicExchange(config.getXDeadLetterExchange()).build();
		rabbitAdmin.declareExchange(topicExchange);

		Queue queue1 = QueueBuilder.durable(config.getConsumerQueueName()).build();
		rabbitAdmin.declareQueue(queue1);

		Binding binding = BindingBuilder.bind(queue1).to(topicExchange).with(config.getXDeadLetterRoutingKey());
		rabbitAdmin.declareBinding(binding);

		if(ExpirationTypeEnum.EXPONENTIAL.getCode().equals(config.getMessageExpirationType())){
			logger.info("申明“指数型”重试队列开始...");
			for (int i = 0; i < config.getRetryCount(); i++) {
				String queueName = null;
				try {
					Map<String, Object> args = new HashMap<String, Object>();
					//指定当成为死信时,重定向到
					args.put("x-dead-letter-exchange", config.getXDeadLetterExchange());
					args.put("x-dead-letter-routing-key", config.getXDeadLetterRoutingKey());

					String expiration = String.valueOf(Double.valueOf(Math.pow(2, i)).intValue()*config.getDelayMilliseconds());
					queueName = config.getRetryQueueNamePrefix() + "." + expiration;

					//声明重试队列,将参数带入
					Queue queue = QueueBuilder.durable(queueName).withArguments(args).build();
					rabbitAdmin.declareQueue(queue);
					logger.info("申明“指数型”重试队列成功[queueName:{}]", queueName);
				}catch (Throwable e){
					logger.error("申明“指数型”重试队列失败[i:{}, queueName:{}, e.message:{}],异常:", i, queueName, e.getMessage(), e);
				}
			}
			logger.info("申明“指数型”重试队列结束...");
		}
	}
}

2.重试

判断重试次数,消费端获取到消息后,根据消息头埋点可以获到重试次数,重试次数超过最大次数则入重试表,待后期分析处理。

/**
 * 判断是否超过重试次数
 */
public RetryEntity isOutOfRetryCount(Message message){
    int messageRetryCount = getMessageRetryCount(message);
    RetryQueueConfigs config = getRetryConfigByOriQueue(message);
    boolean result=messageRetryCount>(null==config?0:config.getRetryCount())?false:true;
    if(!result){
        logger.info("超过最大重试次数,入重试表!");
        //... ...
    }
    return new RetryEntity(result,messageRetryCount);
}

/**
 * 获取重试次数
 */
public int getMessageRetryCount(Message message){
	//初始为0
	int count = 0;
	Map<String, Object> headers = message.getMessageProperties().getHeaders();
	if(headers.containsKey(retryCountFlag)){
		count = NumberUtils.toInt((String) message.getMessageProperties().getHeaders().get(retryCountFlag), 0);
	}
	return count;
}

关于重试即消费端处理失败后进行重新投递,根据重试次数计算要投递的队列名称。

@Override
public boolean sendRetryMessage(Message message) {
	boolean result=true;
	try {
		//从消息题中获取到消息来源--队列名称,然后根据队列名称获取到配置中心此队列配置的相关信息
		RetryQueueConfigs retryConfigByOriQueue = getRetryConfigByOriQueue(message);
		//从消息头中获取到重试次数
		int retryCount = getMessageRetryCount(message);
		//根据配置中心配置的死信消息失效时间计算方式(默认指数方式),和重试次数计算出死信队列名称后缀
		String expiration = getRetryMessageExpiration(retryCount,retryConfigByOriQueue);
		logger.info("消息重发开始[expiration:{}, retryCount:{}]", expiration, retryCount);
		//获取死信队列名称
		String queueName = getRetryQueueName(expiration,retryConfigByOriQueue);
		logger.info("消息重发获取重试队列[expiration:{}, retryCount:{}, queueName:{}]", expiration, retryCount, queueName);
		//发送消息
		rabbitMqService.sendRetry("", queueName, message, expiration, retryCount,retryCountFlag);
		logger.info("消息重发结束[expiration:{}, retryCount:{}]", expiration, retryCount);
	} catch (Exception e) {
		logger.info("({})发送重试消息失败!", JSON.toJSONString(message),e);
		result=false;

	}
	return result;
}

3. 业务端使用

1. 引入依赖

  <dependency>
      <groupId>com.epay</groupId>
      <artifactId>delay-component-spring-boot-stater</artifactId>
      <version>1.0.0-SNAPSHOT</version>
  </dependency>

2. 新增配置

3. 使用


总结

本篇简单的介绍了下在工作中,将RabbitMQ进行简单封装作为延时组件使用,在使用时只需要简单的进行配置下就可以达到延时效果,降低了重复代码的编写,大大缩短了项目开发周期,由于工期紧张封装的starter还是比较粗糙的,还有好多地方需要斟酌打磨。

本篇也只是提供一种思想吧,在工作中可以借鉴下,避免重复劳动,将业务功能组件化,以后不管在什么项目中只要有相同业务场景就可以引入现有组件快速完成业务功能开发。

拙技蒙斧正,不胜雀跃。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/1388.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux基础(3) Vim编辑器与Shell命令脚本

1、VIM文本编辑器 VIM编辑器的三大模式 命令模式&#xff1a; 控制光标移动&#xff0c;可对文本进行复制、粘贴和查找等工作输入模式&#xff1a; 正常的文本录入。末行模式&#xff1a; 保存或退出文档&#xff0c;以及设置编辑环境三种模式的切换&#xff1a; ​注意&…

app自动化测试——Android studio安装与配置

文章目录一、Appium框架介绍二、Appium 生态工具三、环境安装四、安装Android studio五、配置环境变量六、创建模拟器查看设备启动模拟器一、Appium框架介绍 1、跨语言&#xff1a;java、python等 2、跨平台&#xff1a;Android、IOS、Windows、Mac 3、底层多引擎切换 4、生态…

(待补充)小蒟蒻的刷题成长之路-------2023年中国高校计算机大赛-团队程序设计天梯赛(GPLT)上海理工大学校内选拔赛(同步赛)

小蒟蒻的刷题成长之路 蓝桥杯的比赛流程和必考点_蓝桥杯省赛考点_时雨h的博客-CSDN博客 大一学生一周十万字爆肝版C语言总结笔记_大一c语言笔记_时雨h的博客-CSDN博客 程序设计与 C 语言期末复习_时雨h的博客-CSDN博客 P8597 [蓝桥杯 2013 省 B] 翻硬币个人思考总结第五届传智杯…

西瓜视频登录页面

题目 代码 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>登录页面</title><style>td{width: 160px;height: 25px;}img{width: 20px;height: 20px;}.number, .password{background: rgba(0,0,0,.05);}.numbe…

指针进阶(上)

内容小复习&#x1f431;&#xff1a; 字符指针:存放字符的数组 char arr1[10]; 整型数组:存放整型的数组 int arr2[5]; 指针数组:存放的是指针的数组 存放字符指针的数组(字符指针数组) char* arr3[5]; 存放整型指针的数组(整型指针数组) int* arr[6]; 下面进入学习了哦~&…

【二分查找】

二分查找704. 二分查找35. 搜索插入位置34. 在排序数组中查找元素的第一个和最后一个位置结语704. 二分查找 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在…

mybatis中获取参数的两种方式:${}和#{}

目录 1.#{} 2.${} 3.总结 1.#{} 本质是占位符赋值 示例及执行结果&#xff1a; 结论&#xff1a;通过执行结果可以看到&#xff0c;首先对sql进行了预编译处理&#xff0c;然后再传入参数&#xff0c;有效的避免了sql注入的问题&#xff0c;并且传参方式也比较简单&#xf…

Python制作9行最简单音乐播放器?不,我不满足

人生苦短 我用python 好久不见啦~这次就来给大家整个大福利 ~ 源码资料电子书:点击此处跳转文末名片获取 最简单的9行代码音乐播放器如下&#xff1a; import time import pygamefile r歌曲路径 pygame.mixer.init() print(正在播放,file) track pygame.mixer.music.load(f…

计算机面试常见问答题目

英语口语 自我介绍 Hello, teachers. My name is Wang Xu. I come from Ningxia. I graduated from the School of Computer Science, Xi an Jiaotong University, majoring in Internet of Things. Next, I will introduce myself from four aspects. First of all, I studi…

Java开发 - ELK初体验

前言 前面我们讲过消息队列&#xff0c;曾提到消息队列也具有保存消息日志的能力&#xff0c;今天要说的EL看也具备这个能力&#xff0c;不过还是要区分一下功能的。消息队列的日志主要指的是Redis的AOF&#xff0c;实际上只是可以利用了消息队列来保存&#xff0c;却并不是消…

网络编程1(网络背景知识)

A给B发送消息如何保证数据一定能够发送到B的主机上&#xff0c;而不是其他地方 通过IP地址可以实现网络中制定的两个主机之间的通信&#xff0c;除此之外还要确定是哪个进程来处理&#xff0c;这里就用到端口&#xff08;port&#xff09; 端口—在一台主机上用于唯一标识一个…

MySQL索引特性

文章目录为什么要有索引&#xff1f;认识磁盘磁盘的结构磁盘的盘片结构定位扇区磁盘随机访问 (Random Access)与连续访问 (Sequential Access)MySQL与磁盘交互索引的理解测试主键索引索引的原理索引结构是否可以使用其他数据结构B树 vs B树聚簇索引 vs 非聚簇索引为什么要有索引…

基于深度学习的犬种识别软件(YOLOv5清新界面版,Python代码)

摘要&#xff1a;基于深度学习的犬种识别软件用于识别常见多个犬品种&#xff0c;基于YOLOv5算法检测犬种&#xff0c;并通过界面显示记录和管理&#xff0c;智能辅助人们辨别犬种。本文详细介绍博主自主开发的犬种检测系统&#xff0c;在介绍算法原理的同时&#xff0c;给出Py…

分布式微服务架构下网络通信的底层实现原理

在分布式架构中&#xff0c;网络通信是底层基础&#xff0c;没有网络&#xff0c;也就没有所谓的分布式架构。只有通过网络才能使得一大片机器互相协作&#xff0c;共同完成一件事情。 同样&#xff0c;在大规模的系统架构中&#xff0c;应用吞吐量上不去、网络存在通信延迟、我…

Qt音视频开发26-监控画面各种图形绘制设计

一、前言 视频监控系统做到后面&#xff0c;逐渐需要搭配人工智能算法&#xff0c;将算法计算后的信息以OSD标签以及方框各种图形的信息显示到视频中&#xff0c;这种当然和OSD一样也是有两种方式&#xff0c;一种是源头就贴好了&#xff0c;一种是将结果发给软件这边解析绘制…

专项攻克——死锁

文章目录O、死锁定义一、 常见的java死锁代码1. synchronized等待对象释放&#xff0c;导致死锁2. CountDownLatch计数等待&#xff0c;导致死锁二、怎么避免死锁2.1 死锁的四个必要条件2.2 避免死锁2.3 常见的避免死锁技术三、java程序出现死锁&#xff0c;怎么解除&#xff1…

Vue使用的编辑器

作者简介&#xff1a;一名计算机萌新、前来进行学习VUE,让我们一起进步吧。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;我叫于豆豆吖的主页 目录 前言 一.vue常用的IDE工具Visual Studio Code 3. 汉化教程 4.常用快捷键 5. Visual Studio C…

瑞萨Renesas RA2L1 开发板测评(1)--keil环境配置

前言&#xff08;1&#xff09;首先感谢李肯前辈的活动&#xff0c;从而申请到了RA2L1开发板的测评。&#xff08;2&#xff09;本文将会简单介绍此开发的Renesas RA2L1 开发板的前期配置。需要注意的是&#xff0c;MDK版本要5.30 以上。MDK下载链接&#xff1b;&#xff08;3&…

计算机中的浮点数运算

计算机中的浮点数 计算机中以固定长度存储浮点数的方式&#xff0c;造成了浮点数运算过程容易产生上溢和下溢。以float32为例, 其标记位占1bit,指数位占8bit,小数部分占23bit 经典下溢场景 不满足精度导致截断误差 #include <iostream> #include <iomanip> usin…

一行代码“黑”掉任意网站

文章目录只需一行代码&#xff0c;轻轻一点就可以把任意网站变成暗黑模式。 首先我们先做一个实验&#xff0c;在任意网站中&#xff0c;打开浏览器开发者工具(F12)&#xff0c;在 C1onsole 控制台输入如下代码并回车&#xff1a; document.documentElement.style.filterinve…
最新文章