环境
jdk1.8, springboot2.7.3
Maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
配置文件
rocketmq.name-server=192.168.6.128:9876
#生产通用群组, 也可单独指定
rocketmq.producer.group=springBootGroup
#消费通用群组, 也可单独指定
rocketmq.consumer.group=testGroup
server.port=9000
代码
生产者发送消息
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private ProducerService producerService;
// 发送同步消息
@PostMapping("/sendSync")
public Object sendSync(@RequestBody MessageReq req) {
return producerService.sendSyncMessage(req.getTopic(), req.getTag(), req.getMessage());
}
// 发送异步消息
@PostMapping("/sendAsync")
public Object sendAsyncMessage(@RequestBody MessageReq req) {
producerService.sendAsyncMessage(req.getTopic(), req.getTag(), req.getMessage());
return "200";
}
}
@Service
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息.
* @return 发送结果
*/
public SendResult sendSyncMessage(String topic, String tag, String message) {
// param1: topic和tag冒号分隔
return rocketMQTemplate.syncSend(topic + ":" + tag, message);
}
/**
* 发送异步消息.
*/
public void sendAsyncMessage(String topic, String tag, String message) {
rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}
}
消费者
@Component
@RocketMQMessageListener(
consumerGroup = "SimpleStringConsumerGroup", // consumerGroup:消费者组名
topic = "MQ_sp_test1", // topic:订阅的主题
selectorExpression = "Tag-kk||Tag-kk2", // selectorExpression, 1. 根据Tag过滤, 多个用||分割, 也可设置*; 2. 根据SQL92语法过滤
// selectorExpression = "*",
// selectorType = SelectorType.SQL92, // 设置SQL92语法过滤, 不设置默认TAG
messageModel = MessageModel.CLUSTERING, // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式
consumeMode= ConsumeMode.CONCURRENTLY // CONCURRENTLY: 无序消费; ORDERLY: 有序消费
)
public class SimpleConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
测试
同步消息
异步消息
TAG过滤消息
1. 消费者指定了TAG, 不满足的不会消费, 状态是CONSUMED_BUT_FILTERED