1.业务场景
当springboot项目启动时,rabbitmq中间件可能还没有启动成功,这时候springboot项目会直接挂掉,业务需要等待rabbitmq启动成功之后再进行连接redis。
2.项目结构配置
pom文件使用redisson
依赖,配置如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
rabbitmq连接配置:
package com.hero.rabbitmq.config;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
@Configuration
@DependsOn(value = "springContextHolder")
@Conditional(RabbitmqCondition.class)
public class RabbitmqConfig {
@Bean
public CachingConnectionFactory connectionFactory(
@Value("${myConfig.rabbitmq.host}") String host,
@Value("${myConfig.rabbitmq.port}") int port,
@Value("${myConfig.rabbitmq.username}") String username,
@Value("${myConfig.rabbitmq.password}") String password,
@Value("${myConfig.rabbitmq.virtual-host}") String virtualHost,
@Value("${myConfig.rabbitmq.publisher-returns}") Boolean publisherReturns
) {
if (StringUtils.isNotEmpty(host)) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherReturns(publisherReturns);
return connectionFactory;
} else {
return null;
}
}
}
3.增加项目启动时rabbitmq校验
校验逻辑:rabbitmq连接失败则一直重试直到连接成功为止,代码如下:
3.1. 获取 Spring 应用上下文的抽象类
参考文章:springboot 启动时校验redis是否连接成功 中的抽象类部分AbstractValidateItem
3.2 具体校验实现类
package com.hero.validate;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@DependsOn(value = "springContextHolder")
public class ValidateMessageBroker extends AbstractValidateItem {
private String host;
@Value("${myConfig.rabbitmq.port}")
private Integer port;
@Value("${myConfig.rabbitmq.username}")
private String userName;
@Value("${myConfig.rabbitmq.password}")
private String password;
@Override
public void validate() {
log.info(">>> ValidateMessageBroker ,validate ");
rabbitMqValid();
}
public void rabbitMqValid() {
for (int i = 1; i <= 1000000; i++) {
try {
Thread.sleep(backoff(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Connection connection = getRabbitmqConnection();
if (connection != null) {
log.info(" check connect rabbitmq ok");
return;
} else {
log.warn(" retry connect ysp-kube qry rabbitmqaddr {} fail", i);
}
} catch (Exception e) {
if (i == 1) {
e.printStackTrace();
}
log.warn(" retry connect rabbitmq {} fail", i);
if (i == 1000000) {
log.error(" nms will be restarted for retry connect rabbitmq {} fail ,error:{}", i, e);
getContext().close();
}
} finally {
}
}
}
public Connection getRabbitmqConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost("/");
factory.setExceptionHandler(new RabbitMQExceptionHandler());
Connection connection = factory.newConnection();
return connection;
}
public static int backoff(int n) {
if (n > 5) {
n = 5;
}
return 2000 * n;
}
}
这样,当srpingboot项目启动时会校验rabbitmq是否可以成功连接,如果失败则会一直重试直到连接成功。