SpringBoot【开发实用篇】---- 整合第三方技术(消息)

SpringBoot【开发实用篇】---- 整合第三方技术(消息)

  • 消息的概念
  • Java处理消息的标准规范
    • JMS
    • AMQP
    • MQTT
    • Kafka
  • 购物订单发送手机短信案例
    • 订单业务
    • 短息处理业务
  • SpringBoot整合ActiveMQ
    • 安装
    • 整合
  • SpringBoot整合RabbitMQ
    • 安装
    • 整合(direct模型)
    • 整合(topic模型)
  • SpringBoot整合RocketMQ
    • 安装
    • 整合(异步消息)
  • SpringBoot整合Kafka
    • 安装
    • 整合

消息的概念

从广义角度来说,消息其实就是信息,但是和信息又有所不同。信息通常被定义为一组数据,而消息除了具有数据的特征之外,还有消息的来源与接收的概念。通常发送消息的一方称为消息的生产者,接收消息的一方称为消息的消费者。这样比较后,发现其实消息和信息差别还是很大的。

为什么要设置生产者和消费者呢?这就是要说到消息的意义了。信息通常就是一组数据,但是消息由于有了生产者和消费者,就出现了消息中所包含的信息可以被二次解读,生产者发送消息,可以理解为生产者发送了一个信息,也可以理解为生产者发送了一个命令;消费者接收消息,可以理解为消费者得到了一个信息,也可以理解为消费者得到了一个命令。对比一下我们会发现信息是一个基本数据,而命令则可以关联下一个行为动作,这样就可以理解为基于接收的消息相当于得到了一个行为动作,使用这些行为动作就可以组织成一个业务逻辑,进行进一步的操作。总的来说,消息其实也是一组信息,只是为其赋予了全新的含义,因为有了消息的流动,并且是有方向性的流动,带来了基于流动的行为产生的全新解读。开发者就可以基于消息的这种特殊解,将其换成代码中的指令。

对于消息的理解,初学者总认为消息内部的数据非常复杂,这是一个误区。比如我发送了一个消息,要求接受者翻译发送过去的内容。初学者会认为消息中会包含被翻译的文字,已经本次操作要执行翻译操作而不是打印操作。其实这种现象有点过度解读了,发送的消息中仅仅包含被翻译的文字,但是可以通过控制不同的人接收此消息来确认要做的事情。例如发送被翻译的文字仅到A程序,而A程序只能进行翻译操作,这样就可以发送简单的信息完成复杂的业务了,是通过接收消息的主体不同,进而执行不同的操作,而不会在消息内部定义数据的操作行为,当然如果开发者希望消息中包含操作种类信息也是可以的,只是提出消息的内容可以更简单,更单一。

对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式,同步消费与异步消息。

所谓同步消息就是生产者发送完消息,等待消费者处理,消费者处理完将结果告知生产者,然后生产者继续向下执行业务。这种模式过于卡生产者的业务执行连续性,在现在的企业级开发中,上述这种业务场景通常不会采用消息的形式进行处理。

所谓异步消息就是生产者发送完消息,无需等待消费者处理完毕,生产者继续向下执行其他动作。比如生产者发送了一个日志信息给日志系统,发送过去以后生产者就向下做其他事情了,无需关注日志系统的执行结果。日志系统根据接收到的日志信息继续进行业务执行,是单纯的记录日志,还是记录日志并报警,这些和生产者无关,这样生产者的业务执行效率就会大幅度提升。并且可以通过添加多个消费者来处理同一个生产者发送的消息来提高系统的高并发性,改善系统工作效率,提高用户体验。一旦某一个消费者由于各种问题宕机了,也不会对业务产生影响,提高了系统的高可用性。

Java处理消息的标准规范

目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:

  • JMS
  • AMQP
  • MQTT

为什么是三大类,而不是三个技术呢?因为这些都是规范,就想JDBC技术,是个规范,开发针对规范开发,运行还要靠实现类,例如MySQL提供了JDBC的实现,最终运行靠的还是实现。并且这三类规范都是针对异步消息进行处理的,也符合消息的设计本质,处理异步的业务。对以上三种消息规范做一下普及

JMS

JMS(Java Message Service),这是一个规范,作用等同于JDBC规范,提供了与消息服务相关的API接口。

JMS消息模型

JMS规范中规范了消息有两种模型。分别是点对点模型发布订阅模型

点对点模型:peer-2-peer,生产者会将消息发送到一个保存消息的容器中,通常使用队列模型,使用队列保存消息。一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。

发布订阅模型:publish-subscribe,生产者将消息发送到一个保存消息的容器中,也是使用队列模型来保存。但是消息可以被多个消费者消费,生产者和消费者完全独立,相互不需要感知对方的存在。

以上这种分类是从消息的生产和消费过程来进行区分,针对消息所包含的信息不同,还可以进行不同类别的划分。

JMS消息种类

根据消息中包含的数据种类划分,可以将消息划分成6种消息。

  • TextMessage
  • MapMessage
  • BytesMessage
  • StreamMessage
  • ObjectMessage
  • Message (只有消息头和属性)

JMS主张不同种类的消息,消费方式不同,可以根据使用需要选择不同种类的消息。但是这一点也成为其诟病之处,后面再说。整体上来说,JMS就是典型的保守派,什么都按照J2EE的规范来,做一套规范,定义若干个标准,每个标准下又提供一大批API。目前对JMS规范实现的消息中间件技术还是挺多的,毕竟是皇家御用,肯定有人舔,例如ActiveMQ、Redis、HornetMQ。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。

AMQP

JMS的问世为消息中间件提供了很强大的规范性支撑,但是使用的过程中就开始被人诟病,比如JMS设置的极其复杂的多种类消息处理机制。本来分门别类处理挺好的,为什么会被诟病呢?原因就在于JMS的设计是J2EE规范,站在Java开发的角度思考问题。但是现实往往是复杂度很高的。比如我有一个.NET开发的系统A,有一个Java开发的系统B,现在要从A系统给B系统发业务消息,结果两边数据格式不统一,没法操作。JMS不是可以统一数据格式吗?提供了6种数据种类,总有一款适合你啊。NO,一个都不能用。因为A系统的底层语言不是Java语言开发的,根本不支持那些对象。这就意味着如果想使用现有的业务系统A继续开发已经不可能了,必须推翻重新做使用Java语言开发的A系统。

这时候有人就提出说,你搞那么复杂,整那么多种类干什么?找一种大家都支持的消息数据类型不就解决这个跨平台的问题了吗?大家一想,对啊,于是AMQP孕育而生。

单从上面的说明中其实可以明确感知到,AMQP的出现解决的是消息传递时使用的消息种类的问题,化繁为简,但是其并没有完全推翻JMS的操作API,所以说AMQP仅仅是一种协议,规范了数据传输的格式而已。

AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS操作。

优点
具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

JMS消息种类
AMQP消息种类:byte[]
AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,开发了几种全新的消息模型,适应各种各样的消息发送。

AMQP消息模型

  • direct exchange
  • fanout exchange
  • topic exchange
  • headers exchange
  • system exchange

目前实现了AMQP协议的消息中间件技术也很多,而且都是较为流行的技术,例如:RabbitMQ、StormMQ、RocketMQ

MQTT

MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一。由于与JavaEE企业级开发没有交集,此处不作过多的说明。

除了上述3种J2EE企业级应用中广泛使用的三种异步消息传递技术,还有一种技术也不能忽略,Kafka。

Kafka

Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。

购物订单发送手机短信案例

为了便于下面演示各种各样的消息中间件技术,我们创建一个购物过程生成订单时为用户发送短信的案例环境,模拟使用消息中间件实现发送手机短信的过程。

手机验证码案例需求如下:

  • 执行下单业务时(模拟此过程),调用消息服务,将要发送短信的订单id传递给消息中间件

  • 消息处理服务接收到要发送的订单id后输出订单id(模拟发短信)

由于不涉及数据读写,仅开发业务层与表现层,其中短信处理的业务代码独立开发,代码如下:

订单业务

业务层接口

public interface OrderService {
    void order(String id);
}

模拟传入订单id,执行下订单业务,参数为虚拟设定,实际应为订单对应的实体类

业务层实现

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private MessageService messageService;
    
    @Override
    public void order(String id) {
        //一系列操作,包含各种服务调用,处理各种业务
        System.out.println("订单处理开始");
        //短信消息处理
        messageService.sendMessage(id);
        System.out.println("订单处理结束");
        System.out.println();
    }
}

业务层转调短信处理的服务MessageService

表现层服务

@RestController
@RequestMapping("/orders")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id){
        orderService.order(id);
    }
}

表现层对外开发接口,传入订单id即可(模拟)

短息处理业务

业务层接口

public interface MessageService {
    void sendMessage(String id);
    String doMessage();
}

短信处理业务层接口提供两个操作,发送要处理的订单id到消息中间件,另一个操作目前暂且设计成处理消息,实际消息的处理过程不应该是手动执行,应该是自动执行,到具体实现时再进行设计

业务层实现

@Service
public class MessageServiceImpl implements MessageService {
    private ArrayList<String> msgList = new ArrayList<String>();

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列,id:"+id);
        msgList.add(id);
    }

    @Override
    public String doMessage() {
        String id = msgList.remove(0);
        System.out.println("已完成短信发送业务,id:"+id);
        return id;
    }
}

短信处理业务层实现中使用集合先模拟消息队列,观察效果

表现层服务

@RestController
@RequestMapping("/msgs")
public class MessageController {

    @Autowired
    private MessageService messageService;

    @GetMapping
    public String doMessage(){
        String id = messageService.doMessage();
        return id;
    }
}

短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。

下面开启springboot整合各种各样的消息中间件,从严格满足JMS规范的ActiveMQ开始

SpringBoot整合ActiveMQ

ActiveMQ是MQ产品中的元老级产品,早期标准MQ产品之一,在AMQP协议没有出现之前,占据了消息中间件市场的绝大部分份额,后期因为AMQP系列产品的出现,迅速走弱,目前仅在一些线上运行的产品中出现,新产品开发较少采用。

安装

windows版安装包下载地址:https://activemq.apache.org/components/classic/download
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
在这里插入图片描述
启动服务器

activemq.bat

运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,默认对外服务端口61616。

访问web管理服务
ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。

http://127.0.0.1:8161/

web管理服务默认端口8161,访问后可以打开ActiveMQ的管理界面,如下:
在这里插入图片描述
首先输入访问用户名和密码,初始化用户名和密码相同,均为:admin,成功登录后进入管理后台界面,如下:
在这里插入图片描述
看到上述界面视为启动ActiveMQ服务成功。

启动失败

WrapperSimpleApp: Unable to locate the class org.apache.activemq.console.Main: java.lang.UnsupportedClassVersionError: org/apache/activemq/console/Main has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

忘记截图了,大致原因是因为,当前 ActiveMQ 的版本所需要的 jdk 和 jvm 的版本不对,按照官网给出的版本要求,下载对应的版本即可…

在ActiveMQ启动时要占用多个端口,以下为正常启动信息:

wrapper  | --> Wrapper Started as Console
wrapper  | Launching a JVM...
jvm 1    | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
jvm 1    |   Copyright 1999-2006 Tanuki Software, Inc.  All Rights Reserved.
jvm 1    |
jvm 1    | Java Runtime: Oracle Corporation 1.8.0_172 D:\soft\jdk1.8.0_172\jre
jvm 1    |   Heap sizes: current=249344k  free=235037k  max=932352k
jvm 1    |     JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=7ySrCD75XhLCpLjd -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=9364 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
jvm 1    | Extensions classpath:
jvm 1    |   [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
jvm 1    | ACTIVEMQ_HOME: ..\..
jvm 1    | ACTIVEMQ_BASE: ..\..
jvm 1    | ACTIVEMQ_CONF: ..\..\conf
jvm 1    | ACTIVEMQ_DATA: ..\..\data
jvm 1    | Loading message broker from: xbean:activemq.xml
jvm 1    |  INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@5f3ebfe0: startup date [Mon Feb 28 16:07:48 CST 2022]; root of context hierarchy
jvm 1    |  INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64\..\..\data\kahadb]
jvm 1    |  INFO | KahaDB is version 7
jvm 1    |  INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] started
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10434-1646035669595-0:1) is starting
jvm 1    |  INFO | Listening for connections at: tcp://CZBK-20210302VL:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector openwire started
jvm 1    |  INFO | Listening for connections at: amqp://CZBK-20210302VL:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector amqp started
jvm 1    |  INFO | Listening for connections at: stomp://CZBK-20210302VL:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector stomp started
jvm 1    |  INFO | Listening for connections at: mqtt://CZBK-20210302VL:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector mqtt started
jvm 1    |  INFO | Starting Jetty server
jvm 1    |  INFO | Creating Jetty connector
jvm 1    |  WARN | ServletContext@o.e.j.s.ServletContextHandler@7350746f{/,null,STARTING} has uncovered http methods for path: /
jvm 1    |  INFO | Listening for connections at ws://CZBK-20210302VL:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector ws started
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10434-1646035669595-0:1) started
jvm 1    |  INFO | For help or more information please see: http://activemq.apache.org
jvm 1    |  WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: D:\soft\activemq\bin\win64\..\..\data\kahadb only has 68936 mb of usable space. - resetting to maximum available disk space: 68936 mb
jvm 1    |  INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/
jvm 1    |  INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/

其中占用的端口有:61616、5672、61613、1883、61614,如果启动失败,请先管理对应端口即可。以下就是某个端口占用的报错信息,可以从抛出异常的位置看出,启动5672端口时端口被占用,显示java.net.BindException: Address already in use: JVM_Bind。

wrapper  | --> Wrapper Started as Console
wrapper  | Launching a JVM...
jvm 1    | Wrapper (Version 3.2.3) http://wrapper.tanukisoftware.org
jvm 1    |   Copyright 1999-2006 Tanuki Software, Inc.  All Rights Reserved.
jvm 1    |
jvm 1    | Java Runtime: Oracle Corporation 1.8.0_172 D:\soft\jdk1.8.0_172\jre
jvm 1    |   Heap sizes: current=249344k  free=235038k  max=932352k
jvm 1    |     JVM args: -Dactivemq.home=../.. -Dactivemq.base=../.. -Djavax.net.ssl.keyStorePassword=password -Djavax.net.ssl.trustStorePassword=password -Djavax.net.ssl.keyStore=../../conf/broker.ks -Djavax.net.ssl.trustStore=../../conf/broker.ts -Dcom.sun.management.jmxremote -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties -Dactivemq.conf=../../conf -Dactivemq.data=../../data -Djava.security.auth.login.config=../../conf/login.config -Xmx1024m -Djava.library.path=../../bin/win64 -Dwrapper.key=QPJoy9ZoXeWmmwTS -Dwrapper.port=32000 -Dwrapper.jvm.port.min=31000 -Dwrapper.jvm.port.max=31999 -Dwrapper.pid=14836 -Dwrapper.version=3.2.3 -Dwrapper.native_library=wrapper -Dwrapper.cpu.timeout=10 -Dwrapper.jvmid=1
jvm 1    | Extensions classpath:
jvm 1    |   [..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
jvm 1    | ACTIVEMQ_HOME: ..\..
jvm 1    | ACTIVEMQ_BASE: ..\..
jvm 1    | ACTIVEMQ_CONF: ..\..\conf
jvm 1    | ACTIVEMQ_DATA: ..\..\data
jvm 1    | Loading message broker from: xbean:activemq.xml
jvm 1    |  INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@2c9392f5: startup date [Mon Feb 28 16:06:16 CST 2022]; root of context hierarchy
jvm 1    |  INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[D:\soft\activemq\bin\win64\..\..\data\kahadb]
jvm 1    |  INFO | KahaDB is version 7
jvm 1    |  INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] started
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is starting
jvm 1    |  INFO | Listening for connections at: tcp://CZBK-20210302VL:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm 1    |  INFO | Connector openwire started
jvm 1    | ERROR | Failed to start Apache ActiveMQ (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1)
jvm 1    | java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind
jvm 1    |      at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:28)
jvm 1    |      at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2288)
jvm 1    |      at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2769)
jvm 1    |      at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2665)
jvm 1    |      at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:780)
jvm 1    |      at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:742)
jvm 1    |      at org.apache.activemq.broker.BrokerService.start(BrokerService.java:645)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1748)
jvm 1    |      at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1685)
jvm 1    |      at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1615)
jvm 1    |      at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)
jvm 1    |      at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)
jvm 1    |      at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)
jvm 1    |      at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
jvm 1    |      at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)
jvm 1    |      at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
jvm 1    |      at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
jvm 1    |      at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
jvm 1    |      at org.apache.activemq.console.Main.main(Main.java:115)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)
jvm 1    |      at java.lang.Thread.run(Thread.java:748)
jvm 1    | Caused by: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind
jvm 1    |      at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:34)
jvm 1    |      at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:146)
jvm 1    |      at org.apache.activemq.transport.tcp.TcpTransportFactory.doBind(TcpTransportFactory.java:62)
jvm 1    |      at org.apache.activemq.transport.TransportFactorySupport.bind(TransportFactorySupport.java:40)
jvm 1    |      at org.apache.activemq.broker.TransportConnector.createTransportServer(TransportConnector.java:335)
jvm 1    |      at org.apache.activemq.broker.TransportConnector.getServer(TransportConnector.java:145)
jvm 1    |      at org.apache.activemq.broker.TransportConnector.asManagedConnector(TransportConnector.java:110)
jvm 1    |      at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2283)
jvm 1    |      ... 46 more
jvm 1    | Caused by: java.net.BindException: Address already in use: JVM_Bind
jvm 1    |      at java.net.DualStackPlainSocketImpl.bind0(Native Method)
jvm 1    |      at java.net.DualStackPlainSocketImpl.socketBind(DualStackPlainSocketImpl.java:106)
jvm 1    |      at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
jvm 1    |      at java.net.PlainSocketImpl.bind(PlainSocketImpl.java:190)
jvm 1    |      at java.net.ServerSocket.bind(ServerSocket.java:375)
jvm 1    |      at java.net.ServerSocket.<init>(ServerSocket.java:237)
jvm 1    |      at javax.net.DefaultServerSocketFactory.createServerSocket(ServerSocketFactory.java:231)
jvm 1    |      at org.apache.activemq.transport.tcp.TcpTransportServer.bind(TcpTransportServer.java:143)
jvm 1    |      ... 52 more
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is shutting down
jvm 1    |  INFO | socketQueue interrupted - stopping
jvm 1    |  INFO | Connector openwire stopped
jvm 1    |  INFO | Could not accept connection during shutdown  : null (null)
jvm 1    |  INFO | Connector amqp stopped
jvm 1    |  INFO | Connector stomp stopped
jvm 1    |  INFO | Connector mqtt stopped
jvm 1    |  INFO | Connector ws stopped
jvm 1    |  INFO | PListStore:[D:\soft\activemq\bin\win64\..\..\data\localhost\tmp_storage] stopped
jvm 1    |  INFO | Stopping async queue tasks
jvm 1    |  INFO | Stopping async topic tasks
jvm 1    |  INFO | Stopped KahaDB
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) uptime 0.426 seconds
jvm 1    |  INFO | Apache ActiveMQ 5.16.3 (localhost, ID:CZBK-20210302VL-10257-1646035577620-0:1) is shutdown
jvm 1    |  INFO | Closing org.apache.activemq.xbean.XBeanBrokerFactory$1@2c9392f5: startup date [Mon Feb 28 16:06:16 CST 2022]; root of context hierarchy
jvm 1    |  WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind
jvm 1    | ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext
jvm 1    | java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext
jvm 1    |      at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:91)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
jvm 1    |      at org.apache.activemq.console.Main.main(Main.java:115)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)
jvm 1    |      at java.lang.Thread.run(Thread.java:748)
jvm 1    | Caused by: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext
jvm 1    |      at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
jvm 1    |      at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)
jvm 1    |      ... 16 more
jvm 1    | ERROR: java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext
jvm 1    | java.lang.IllegalStateException: BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext
jvm 1    |      at org.springframework.context.support.AbstractRefreshableApplicationContext.getBeanFactory(AbstractRefreshableApplicationContext.java:164)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1034)
jvm 1    |      at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:555)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
jvm 1    |      at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)
jvm 1    |      at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
jvm 1    |      at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
jvm 1    |      at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)
jvm 1    |      at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)
jvm 1    |      at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
jvm 1    |      at org.apache.activemq.console.Main.main(Main.java:115)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jvm 1    |      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jvm 1    |      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jvm 1    |      at java.lang.reflect.Method.invoke(Method.java:498)
jvm 1    |      at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)
jvm 1    |      at java.lang.Thread.run(Thread.java:748)
wrapper  | <-- Wrapper Stopped
请按任意键继续. . .

整合

步骤①:导入springboot整合ActiveMQ的starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

步骤②:配置ActiveMQ的服务器地址

spring:
  activemq:
    broker-url: tcp://localhost:61616

步骤③:使用JmsMessagingTemplate操作ActiveMQ

@Service
public class MessageServiceActivemqImpl implements MessageService {
    @Autowired
    private JmsMessagingTemplate messagingTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列,id:"+id);
        messagingTemplate.convertAndSend("order.queue.id",id);
    }

    @Override
    public String doMessage() {
        String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
        System.out.println("已完成短信发送业务,id:"+id);
        return id;
    }
}

发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容。

接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
public class MessageListener {
    @JmsListener(destination = "order.queue.id")
    @SendTo("order.other.queue.id")
    public String receive(String id){
        System.out.println("已完成短信发送业务,id:"+id);
        return "new:"+id;
    }
}

使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。

步骤⑤:切换消息模型由点对点模型到发布订阅模型,修改jms配置即可

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    pub-sub-domain: true

pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。

SpringBoot整合RabbitMQ

RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。

安装

Erlang安装
windows版安装包下载地址:https://www.erlang.org/downloads
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。
安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是自动执行的,如下:
在这里插入图片描述
Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。

  • ERLANG_HOME
  • PATH

安装
windows版安装包下载地址:https://rabbitmq.com/install-windows.html
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件
在这里插入图片描述
启动服务器

rabbitmq-service.bat start		# 启动服务
rabbitmq-service.bat stop		# 停止服务
rabbitmqctl status				# 查看服务状态

运行sbin目录下的rabbitmq-service.bat命令即可,start参数表示启动,stop参数表示退出,默认对外服务端口5672。
注意:启动rabbitmq的过程实际上是开启rabbitmq对应的系统服务,需要管理员权限方可执行。
说明:有没有感觉5672的服务端口很熟悉?activemq与rabbitmq有一个端口冲突问题,学习阶段无论操作哪一个?请确保另一个处于关闭状态。

说明:不喜欢命令行的小伙伴可以使用任务管理器中的服务页,找到RabbitMQ服务,使用鼠标右键菜单控制服务的启停。
在这里插入图片描述

访问web管理服务
RabbitMQ也提供有web控制台服务,但是此功能是一个插件,需要先启用才可以使用。

rabbitmq-plugins.bat list							# 查看当前所有插件的运行状态
rabbitmq-plugins.bat enable rabbitmq_management		# 启动rabbitmq_management插件

启动插件后可以在插件运行状态中查看是否运行,运行后通过浏览器即可打开服务后台管理界面

http://localhost:15672

web管理服务默认端口15672,访问后可以打开RabbitMQ的管理界面,如下:
在这里插入图片描述
首先输入访问用户名和密码,初始化用户名和密码相同,均为:guest,成功登录后进入管理后台界面,如下:
在这里插入图片描述

整合(direct模型)

RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。

步骤①:导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤②:配置RabbitMQ的服务器地址

spring:
  rabbitmq:
    host: localhost
    port: 5672

步骤③:初始化直连模式系统设置
由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等

@Configuration
public class RabbitConfigDirect {
    @Bean
    public Queue directQueue(){
        return new Queue("direct_queue");
    }
    @Bean
    public Queue directQueue2(){
        return new Queue("direct_queue2");
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }
    @Bean
    public Binding bindingDirect(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }
    @Bean
    public Binding bindingDirect2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
    }
}

队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。

步骤④:使用AmqpTemplate操作RabbitMQ

@Service
public class MessageServiceRabbitmqDirectImpl implements MessageService {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:"+id);
        amqpTemplate.convertAndSend("directExchange","direct",id);
    }
}

amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。

步骤⑤:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
public class MessageListener {
    @RabbitListener(queues = "direct_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
    }
}

使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。

整合(topic模型)

步骤①:同上

步骤②:同上

步骤③:初始化主题模式系统设置

@Configuration
public class RabbitConfigTopic {
    @Bean
    public Queue topicQueue(){
        return new Queue("topic_queue");
    }
    @Bean
    public Queue topicQueue2(){
        return new Queue("topic_queue2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Binding bindingTopic(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id");
    }
    @Bean
    public Binding bindingTopic2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*");
    }
}

主题模式支持routingKey匹配模式,*表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中。

匹配键topic.*.*topic.#
topic.order.idtruetrue
order.topic.idfalsefalse
topic.sm.order.idfalsetrue
topic.sm.idfalsetrue
topic.id.ordertruetrue
topic.idfalsetrue
topic.orderfalsetrue

步骤④:使用AmqpTemplate操作RabbitMQ

@Service
public class MessageServiceRabbitmqTopicImpl implements MessageService {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:"+id);
        amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id);
    }
}

发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。

步骤⑤:使用消息监听器在服务器启动后,监听指定队列

@Component
public class MessageListener {
    @RabbitListener(queues = "topic_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id);
    }
    @RabbitListener(queues = "topic_queue2")
    public void receive2(String id){
        System.out.println("已完成短信发送业务(rabbitmq topic 22222222),id:"+id);
    }
}

使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。

SpringBoot整合RocketMQ

RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。

安装

windows版安装包下载地址:https://rocketmq.apache.org/
下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件
在这里插入图片描述
RocketMQ安装后需要配置环境变量,具体如下:

  • ROCKETMQ_HOME
  • PATH
  • NAMESRV_ADDR (建议): 127.0.0.1:9876

关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。

RocketMQ工作模式
在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信。broker启动后会通知命名服务器自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须保障命名服务器先启动。
在这里插入图片描述
启动服务器

mqnamesrv		# 启动命名服务器
mqbroker		# 启动broker

运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。

运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。

测试服务器启动状态
RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。

tools org.apache.rocketmq.example.quickstart.Producer		# 生产消息
tools org.apache.rocketmq.example.quickstart.Consumer		# 消费消息

整合(异步消息)

步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

步骤②:配置RocketMQ的服务器地址

rocketmq:
  name-server: localhost:9876
  producer:
    group: group_rocketmq

设置默认的生产者消费者所属组group。

步骤③:使用RocketMQTemplate操作RocketMQ

@Service
public class MessageServiceRocketmqImpl implements MessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }
            @Override
            public void onException(Throwable e) {
                System.out.println("消息发送失败!!!!!");
            }
        };
        rocketMQTemplate.asyncSend("order_id",id,callback);
    }
}

使用asyncSend方法发送异步消息。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String id) {
        System.out.println("已完成短信发送业务(rocketmq),id:"+id);
    }
}

RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。

使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。

SpringBoot整合Kafka

安装

windows版安装包下载地址:https://kafka.apache.org/downloads
下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件
在这里插入图片描述
建议使用windows版2.8.1版本。

启动服务器
kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。

zookeeper-server-start.bat ..\..\config\zookeeper.properties		# 启动zookeeper
kafka-server-start.bat ..\..\config\server.properties				# 启动kafka

运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。

创建主题
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic。

# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list					
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima

测试服务器启动状态
Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。

kafka-console-producer.bat --broker-list localhost:9092 --topic itheima							# 测试生产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima --from-beginning	# 测试消息消费

整合

步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

步骤②:配置Kafka的服务器地址

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order

设置默认的生产者消费者所属组id。

步骤③:使用KafkaTemplate操作Kafka

@Service
public class MessageServiceKafkaImpl implements MessageService {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
        kafkaTemplate.send("itheima2022",id);
    }
}

使用send方法发送消息,需要传入topic名称。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
public class MessageListener {
    @KafkaListener(topics = "itheima2022")
    public void onMessage(ConsumerRecord<String,String> record){
        System.out.println("已完成短信发送业务(kafka),id:"+record.value());
    }
}

使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/21249.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

加密解密软件VMProtect教程(六):主窗口之控制面板“项目”部分(2)

VMProtect 是新一代软件保护实用程序。VMProtect支持德尔菲、Borland C Builder、Visual C/C、Visual Basic&#xff08;本机&#xff09;、Virtual Pascal和XCode编译器。 同时&#xff0c;VMProtect有一个内置的反汇编程序&#xff0c;可以与Windows和Mac OS X可执行文件一起…

solidworks2020及麦迪工具箱安装

1、麦迪工具箱安装 1&#xff09;下载 下载链接&#xff1a;www.maidiyun.com/download 下载今日制造 2&#xff09;安装 由于电脑上安装了杀毒软件&#xff0c;会直接删除解压后的安装包&#xff0c;因此需要关闭杀毒软件或者在被删除后进入杀毒软件的隔离区将该文件添加…

Tomcat安装与使用

Tomcat 是HTTP服务器&#xff0c;用于使用HTTP协议。 1、下载Tomcat 下载链接&#xff1a;https://tomcat.apache.org/ 进入官网后&#xff0c;根据自己想要下载的版本进行下载&#xff0c;我这里选择下载的版本是Tomcat 8. 点击选择自己想要下载的对应版本&#xff0c;下载Z…

Netty入门

Netty入门 1. 概述 1.1 Netty是什么&#xff1f; Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.以上片段摘自官网&#xff0c;Netty 是一个异步的、基于事…

三十六、链路追踪、配置中心

1、链路追踪 在一次调用链路中&#xff0c;可能设计到多个微服务&#xff0c;如果在线上&#xff0c;某个微服务出现故障&#xff0c;如何快速定位故障所在额微服务呢。 可以使用链路追踪技术 1.1链路追踪介绍 在大型系统的微服务化构建中&#xff0c;一个系统被拆分成了许多微…

chatgpt赋能Python-python3_排序

Python3 排序指南&#xff1a;介绍、说明和实践 Python3是当今最受欢迎的编程语言之一&#xff0c;拥有许多可用于各种任务的库和框架。其中之一是它自带的排序函数&#xff0c;在数据分析和机器学习等领域中非常有用。 在本篇文章中&#xff0c;我们将简要介绍Python3的排序和…

基于AT89C51单片机的贪吃蛇游戏设计

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/87778030 源码获取 主要内容: 设计一个贪吃蛇游戏,使其具有以下游戏规则:①当没有改变方向时,贪吃蛇沿原来路径一直前进②贪吃蛇无法回头,只能异于当前方向改变行动③蛇…

第7章链接:如何动态连接共享库、从应用程序中加载和链接共享库

文章目录 7.10 动态链接共享库静态库的缺点何为共享库共享库的"共享"的含义动态链接过程 7.11 从应用程序中加载和链接共享库运行时动态加载和连接共享库的接口 dlopen函数 dlsym函数 dlclose函数 dlerror动态加载和链接共享库的应用程序示例 7.12 *与位置无关的代码…

强大,Midjourney Imagine API接口,AI画画的福音!

前几天跟大家分享过一篇 ”让chatGPT教你AI绘画|如何将chatGPT与Midjourney结合使用&#xff1f;“&#xff0c;但是由于许多小伙伴们使用Midjourney还有许多困难&#xff0c;又要上网&#xff0c;还要注册Discord&#xff0c;MJ的使用成本很高&#xff0c;让大家望而却步&…

链表题目强化练

目录 前言&#xff08;非题目&#xff09; 两数相加 删除链表的倒数第N个结点 环形链表 相交链表 合并 K 个升序链表 复制带随机指针的链表 前言&#xff08;非题目&#xff09; 初学者在做链表的题目时有一个特点&#xff0c;就是每看一个链表的题都觉得很简单&#x…

Python程序员职业现状分析,想提高竞争力,就要做到这六点

现今程序员群体数量已经高达几百万&#xff0c;学历和收入双高&#xff0c;月薪普遍过万。今天&#xff0c;我们就围绕90后程序员人群分析、职业现状、Python程序员分析等&#xff0c;进行较为全面的报告分析和观点论述。 一、程序员人群分析 人数规模上&#xff1a;截当前程…

【设计原则与思想:总结课】38 | 总结回顾面向对象、设计原则、编程规范、重构技巧等知识点

到今天为止&#xff0c;设计原则和思想已经全部讲完了&#xff0c;其中包括&#xff1a;面向对象、设计原则、规范与重构三个模块的内容。除此之外&#xff0c;我们还学习了贯穿整个专栏的代码质量评判标准。专栏的进度已经接近一半&#xff0c;马上就要进入设计模式内容的学习…

类似于ChatGPT的优秀应用notion

notion 是一款流行的笔记应用。不过功能实际远超笔记&#xff0c;官方自己定义是&#xff1a;“将笔记、知识库和任务管理无缝整合的协作平台”。其独特的 block 概念&#xff0c;极大的扩展了笔记文档的作用&#xff0c;一个 block 可以是个数据库、多媒体、超链接、公式等等。…

怎么用问卷工具做市场调研?

对于希望开发新产品或服务、拓展新市场或确定潜在客户的公司来说&#xff0c;市场调查是一个至关重要的过程。然而&#xff0c;进行市场调查可能既耗时又昂贵&#xff0c;特别是在涉及对大量人群进行调查的情况下。今天&#xff0c;小编将来聊一聊调查问卷工具如何帮助企业进行…

Rasa 3.x 学习系列-Rasa [3.5.8] -2023-05-12新版本发布

Rasa 3.x 学习系列-Rasa [3.5.8] -2023-05-12新版本发布 当自定义动作设置的值与槽的现有值相同时&#xff0c;将触发SlotSet事件。修复了这个问题&#xff0c;使AugmentedMemoizationPolicy能够正确地处理截断的跟踪器。 为了恢复以前的行为&#xff0c;自定义操作只有在槽值…

【C++进阶】继承详解

文章目录 前言一、继承的概念及定义1.概念2.继承定义定义格式继承关系和访问限定继承基类成员访问方式的变化 二、基类和派生类对象赋值转换三、继承中的作用域四、派生类的默认成员函数五、继承与友元六、继承与静态成员七、复杂的菱形继承及菱形虚拟继承1.单继承与多继承2.菱…

软件 工程

目录 第十章、软件工程1、瀑布模型&#xff08;SDLC&#xff09;2、快速原型模型3、增量模型4、螺旋模型5、Ⅴ模型6、喷泉模型7、构建组装模型&#xff08;CBSD&#xff09;8、统一过程&#xff08;RUP&#xff09;9、敏捷开发方法10、信息系统开发方法11、需求开发12、结构化设…

收藏|必读10本pcb设计书籍推荐

1."High-Speed Digital Design: A Handbook of Black Magic"。 作者是Howard Johnson和Martin Graham。这是一本关于高速数字电路设计的优秀教材&#xff0c;适合那些需要设计高速电路的工程师。 作为比较早出来的信号完整性参考书&#xff0c;对国内的信号完整性研…

H.265/HEVC编码原理及其处理流程的分析

H.265/HEVC编码原理及其处理流程的分析 H.265/HEVC编码的框架图&#xff0c;查了很多资料都没搞明白&#xff0c;各个模块的处理的分析网上有很多&#xff0c;很少有把这个流程串起来的。本文的主要目的是讲清楚H.265/HEVC视频编码的处理流程&#xff0c;不涉及复杂的计算过程。…

第3天学习Docker-Docker部署常见应用(MySQL、Tomcat、Nginx、Redis、Centos)

前提须知&#xff1a; &#xff08;1&#xff09;搜索镜像命令 格式&#xff1a;docker search 镜像名 &#xff08;2&#xff09;设置Docker镜像加速器 详见文章&#xff1a;Docker设置ustc的镜像源&#xff08;镜像加速器&#xff09; 1、部署MySQL 拉取镜像&#xff08;这…
最新文章