微服务架构(二)

Sentinel 使用及概念

什么是 Sentinel

        Sentinel (分布式系统的流量防卫兵) 是阿里开源的一套用于服务容错 的综合性解决方案。它以流量 为切入点, 从流量控制、熔断降级、系统负载保护 等多个维度来保护服务的稳定性。
Sentinel 具有以下特征:
        丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景, 例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
        完备的实时监控:Sentinel 提供了实时的监控功能。通过控制台可以看到接入应用的单台机器秒级数据, 甚至 500 台以下规模的集群的汇总运行情况。
        广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
Sentinel 分为两个部分:
        核心库( Java 客户端) 不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
        控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。

Sentinel 的概念和功能

基本概念

        资源就是 Sentinel 要保护的东西
                资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,可以是一个服务,也可以是一个方法,甚至可以是一段代码。
               我们入门案例中的 message/test 接口就可以认为是一个资源。
        规则就是用来定义如何进行保护资源的
                作用在资源之上, 定义以什么样的方式保护资源,主要包括流量控制规则、熔断降级规则以及系统保护规则。
                我们入门案例中就是为 message1 资源设置了一种流控规则, 限制了进入message/test 的流量。

重要功能

 Sentinel 的主要功能就是容错,主要体现为下面这三个:

流量控制
        流量控制在网络传输中是一个常用的概念,它用于调整网络包的数据。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。
        Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状。
熔断降级
        当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间长或异常比例升高的时候,则对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联故障
Sentinel 对这个问题采取了两种手段:
        通过并发线程数进行限制
                Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的 线程完成任务后才开始继续接收请求。
        通响应时间对资源进行降级
                除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。
        系统负载保护
                Sentinel 同时提供系统维度的自适应保护能力。当系统负载较高的时候,如果还持续让请求进入可能会导致系统崩溃,无法响应。在集群环境下,会把本应这台机器承载的流量转发到其它的机器上去。如果这个时候其它的机器也处在一个边缘状态的时候,Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
        总之一句话: 我们需要做的事情,就是在 Sentinel 的资源上配置各种各样的规 则,来实现各种容错的功能。

微服务集成 Sentinel

        为微服务集成 Sentinel 非常简单, 只需要加入 Sentinel 的依赖即
        可在 pom.xml 中加入下面依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

         编写一个 Controller 测试使用

@RestController
@RequestMapping(path = "/message")
public class MessageController {
    @GetMapping(path = "/test1")
    public String test1(){
        return "测试高并发 1";
    }

    @GetMapping(path = "/test2")
    public String test2(){
        return "测试高并发 2";
    }
}
        下载客户端
                https://github.com/alibaba/Sentinel/releases
        启动控制台
                # 直接使用 jar 命令启动项目(控制台本身是一个 SpringBoot 项目)
                java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.5.jar
                访问控制台: http://ip+端口 默认用户名密码是 sentinel/sentinel

        Sentinel 的控制台其实就是一个 SpringBoot 编写的程序。我们需要将我们的微服务程序注册到控制台上, 即在微服务中指定控制台的地址, 并且还要开启一个跟控制台传递数据的端口, 控制台也可以通过此端口调用微服务中的监控程序获取微服务的各种信息。
        实现一个接口的限流
                为某个接口添加访问控制

 访问数量超过时限流

 

Gateway--服务网关

网关简介

        大家都都知道在微服务架构中,一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用 这么多的微服务呢?如果没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用。
        这样的架构,会存在着诸多的问题:
                客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性。
                认证复杂,每个服务都需要独立认证。
                存在跨域请求,在一定场景下处理相对复杂。
        上面的这些问题可以借助 API 网关 来解决。
        所谓的 API 网关,就是指系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一服务,一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等。
        添加上 API 网关之后,系统的架构图变成了如下所示:

        我们也可以观察下,我们现在的整体架构图: 

在业界比较流行的网关,有下面这些:
        Ngnix+lua
                使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用 lua 是一种脚本语言,可以来编写一些简单的逻辑, nginx 支持 lua 脚本.
        Kong
                基于 Nginx+Lua 开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)
                可以开箱即用。 问题: 只支持 Http 协议;二次开发,自由扩展困难;提供管理 API,缺乏更易用的管控、配置方式。
        Zuul
                Netflix 开源的网关,功能丰富,使用 JAVA 开发,易于二次开发 问题:缺乏管控,无法动态配 置;依赖组件较多;处理 Http 请求依赖的是 Web 容器,性能不如 Nginx
        Spring Cloud Gateway
                SpringCloud alibaba 技术栈中并没有提供自己的网关,我们可以采用Spring Cloud Gateway 来做网关,将在下面具体介绍。

 

Gateway 简介

        Spring Cloud Gateway 是 Spring 公司基于 Spring 5.0,Spring Boot 2.0和 Project Reactor 等技术 开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。它的目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控和限流。
        优点:
                性能强劲,是第一代网关 Zuul 的 1.6 倍
                功能强大,内置了很多实用的功能,例如转发、监控、限流等
                设计优雅,容易扩展
        缺点:
                其实现依赖 Netty 与 WebFlux,不是传统的 Servlet 编程模型,学习成本高
                不能将其部署在 Tomcat、Jetty 等 Servlet 容器里,只能打成 jar 包执行
                需要 Spring Boot 2.0 及以上的版本,才支持

Gateway 快速入门 

        要求: 通过浏览器访问 api 网关,然后通过网关将请求转发到商品微服务.
        1.创建 api 网关模块(略)
        2.导入依赖,不能导入 web 相关的依赖
<!--gateway 网关-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
        3.创建主类
@SpringBootApplication
public class GatewayApplication {
    public static void main(String[] args) {
        SpringApplication.run(GatewayApplication.class, args);
    }
}
        4.添加配置文件
server:
    port: 7000
spring:
    application:
        name: api-gateway
    cloud:
        gateway:
            routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
            - id: order_route # 当前路由的标识, 要求唯一
              uri: http://localhost:8081 # 请求要转发到的地址
              order: 1 # 路由的优先级,数字越小级别越高
              predicates: # 断言(就是路由转发要满足的条件)
                - Path=/order-serv/** # 当请求路径满足Path指定的规则时, 才进行路由转发
              filters: # 过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
                - StripPrefix=1 # 转发之前去掉 1 层路径
        5. 启动项目, 并通过网关去访问微服务
                http://127.0.0.1:9001/order-serv/order/createOrder/1/1/1

Gateway 结合 nacos

        现在在配置文件中写死了转发路径的地址, 前面我们已经分析过地址写死带来的问题, 接下来我们从注册中心获取此地址。
        1.加入 nacos 依赖
<!--nacos 服务发现依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
        2.在主类上添加注解
                @SpringBootApplication
                @EnableDiscoveryClient

         3.修改配置文件

server:
    port: 9001
spring:
    application:
        name: api-gateway
    cloud:
        nacos:
            discovery:
                server-addr: 127.0.0.1:8848 #nacos 服务地址
        gateway:
            discovery:
                locator:
                    enabled: true
            routes: # 路由数组[路由 就是指定当请求满足什么条件的时候转到哪个微服务]
                - id: product_route # 当前路由的标识, 要求唯一
                  uri: lb://service-order # lb 指的是从 nacos 中按照名称获取微服务, 并遵循负载均衡策略
                  order: 1 # 路由的优先级,数字越小级别越高
                  predicates: # 断言(就是路由转发要满足的条件)
                        - Path=/order-serv/** # 当请求路径满足 Path 指定的规则时,才进行路由转发
                  filters: #过滤器,请求在传递过程中可以通过过滤器对其进行一定的修改
                        - StripPrefix=1 # 转发之前去掉 1 层路径
        4.测试访问
                http://127.0.0.1:9001/order-serv/order/createOrder/1/1/1

 

全局过滤器

        全局过滤器作用于所有路由, 无需配置。通过全局过滤器可以实现对权限的统一校验,安全性验证等功能.

 

内置全局过滤器
自定义全局过滤器
        内置的过滤器已经可以完成大部分的功能,但是对于企业开发的一些业务功能处理,还是需要我们自己编写过滤器来实现的,那么我们一起通过代码的形式自定义一个过滤器,去完成统一的权限校验。 开发中的鉴权逻辑:
  • 当客户端第一次请求服务时,服务端对用户进行信息认证(登录)
  • 认证通过,将用户信息进行加密形成 token,返回给客户端,作为登录凭证
  • 以后每次请求,客户端都携带认证的 token
  • 服务端对 token 进行解密,判断是否有效。
        自定义一个全局过滤器,去校验所有请求的请求参数中是否包含“token”, 如何不包含请求
         参数“token”则不转发路由,否则执行正常的逻辑。
@Component
public class TokenFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    //获取请求中的参数部分
    String token = exchange.getRequest().getQueryParams().getFirst("token");
    if (!"123456".equals(token)) {//模拟验证 token
        System.out.println("鉴权失败");
        exchange.getResponse().setStatusCode(201);
        return exchange.getResponse().setComplete();//响应状态码
    }
    //调用 chain.filter 继续向下游执行
    return chain.filter(exchange);
}

    @Override
    public int getOrder() {
        return 0;
    }
}

网关限流

        网关是所有请求的公共入口,所以可以在网关进行限流,而且限流的方式也很多,我们本次采用前 面学过的 Sentinel 组件来实现网关的限流。Sentinel 支持对SpringCloud Gateway、Zuul 等主流网关进行限流。
        从 1.6.0 版本开始,Sentinel 提供了 SpringCloud Gateway 的适配模块,可以提供两种资源维度的限流: route 维度:即在 Spring 配置文件中配置的路由条目,资源名为对应的 routeId 自定义 API 维度:用户可以利用 Sentinel 提供的API 来自定义一些 API 分组.

         1 导入依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>
        2 编写配置类
                基于 Sentinel 的 Gateway 限流是通过其提供的 Filter 来完成的,使用时只需注入对应的 SentinelGatewayFilter 实例以及SentinelGatewayBlockExceptionHandler 实例即可。
@Configuration
public class GatewayConfiguration {
    private final List<ViewResolver> viewResolvers;
    private final ServerCodecConfigurer serverCodecConfigurer;
    public GatewayConfiguration(ObjectProvider<List<ViewResolver>>
    viewResolversProvider,ServerCodecConfigurer serverCodecConfigurer) {
    this.viewResolvers =
        viewResolversProvider.getIfAvailable(Collections::emptyList);
    this.serverCodecConfigurer = serverCodecConfigurer;
}

// 初始化一个限流的过滤器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
    return new SentinelGatewayFilter();
}

// 配置初始化的限流参数
@PostConstruct
public void initGatewayRules() {
    Set<GatewayFlowRule> rules = new HashSet<>();
    rules.add(new GatewayFlowRule("order_route") //资源名称,对应路由 id
    .setCount(1) // 限流阈值
    .setIntervalSec(1) // 统计时间窗口,单位是秒,默认是 1 秒
    );
    GatewayRuleManager.loadRules(rules);
}

// 配置限流的异常处理器
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public SentinelGatewayBlockExceptionHandler
    sentinelGatewayBlockExceptionHandler() {
    return new SentinelGatewayBlockExceptionHandler(viewResolvers,
    serverCodecConfigurer);
}

// 自定义限流异常页面
@PostConstruct
public void initBlockHandlers() {
BlockRequestHandler blockRequestHandler = new BlockRequestHandler() {
    public Mono<ServerResponse> handleRequest(ServerWebExchange
    serverWebExchange, Throwable throwable) {
        Map map = new HashMap<>();
        map.put("code", 0);
        map.put("message", "接口被限流了");
        return ServerResponse.status(HttpStatus.OK).
        contentType(MediaType.APPLICATION_JSON_UTF8).
        body(BodyInserters.fromObject(map));
        }
    };
    GatewayCallbackManager.setBlockHandler(blockRequestHandler);
    }
}

 

消息队列-MQ

什么是 MQ

        MQ 全称(Message Queue)又名消息队列,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。

 

MQ 的应用场景

异步解耦

        最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

 

        此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。 但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续 的注册短信和邮件不是即时需要关注的步骤。 所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。
        架构图如下:
        异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比 较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列 MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。

 

常见的 MQ 产品

        目前业界有很多 MQ 产品,比较出名的有下面这些:
                ZeroMQ: 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用 C 语言 实现,实际上只是一个 socket 库的重新封装,如果做为消息队列使用,需要开发大量的代码。 ZeroMQ 仅提供非持久性的队列,也就是说如果 down 机,数据将会丢失。
                RabbitMQ: 使用 erlang 语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
                ActiveMQ: 历史悠久的 Apache 开源项目。已经在很多产品中得到应用,实现了 JMS1.1 规范,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
                RocketMQ: 阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来 很简单。
                Kafka: Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

RocketMQ 入门

        RocketMQ 是阿里巴巴开源的分布式消息中间件,现在是 Apache 的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双 11"这种万亿级的消息流转。

RocketMQ 环境搭建

        软硬件需求
                系统要求是 64 位的,JDK 要求是 1.8 及其以上版本的。
        下载
                https://rocketmq.apache.org/download/
        解压
        配置环境变量
                ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-4.9.3
                NAMESRV_ADDR =127.0.0.1:9876
        启动 Name Server
                进入到 bin 目录输入命令: mqnamesrv.cmd
        启动 Broker
                进入到 bin 目录输入命令:
                mqbroker.cmd -n 127.0.0.1:9876 atuoCreateTopicEnable=true
        发送和接收消息测试
        模拟发送消息
                进入到 bin 目录输入命令:  tools.cmd org.apache.rocketmq.example.quickstart.Producer
        模拟接收消息
                进入到 bin 目录输入命令: tools.cmd org.apache.rocketmq.example.quickstart.Consumer
        控制台安装与启动

 

         解压

        修改其 src/main/resources 中的 application.properties 配置文件

         在解压目录 rocketmq-console 的 pom.xml 中添加如下 JAXB 依赖。

<dependency>
    <groupId>javax.xml.bind</groupId>
    <artifactId>jaxb-api</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-impl</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-core</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
    <groupId>javax.activation</groupId>
    <artifactId>activation</artifactId>
    <version>1.1.1</version>
</dependency>
        打包
        命令行进入到 rocketmq-console
                mvn clean package -Dmaven.test.skip=true
        打包后,进入 target 目录启动控制台 java -jar rocketmq-console-ng-1.0.0.jar
        访问: http://127.0.0.1:6060

RocketMQ 的架构及概念 

        如上图所示,整体可以分成 4 个角色,分别是: NameServer,Broker,Producer,Consumer。
        Broker(邮递员) Broker 是 RocketMQ 的核心,负责消息的接收,存储,投递等功能.
        NameServer(邮局) 消息队列的协调者,Broker 向它注册路由信息,同时Producer 和 Consumer 向其获取路由信息
        Producer(寄件人) 消息的生产者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,向 Broker 发送消 息
        Consumer(收件人) 消息的消费者,需要从 NameServer 获取 Broker 信息,然后与 Broker 建立连接,从 Broker 获取消息
        Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对 Topic 来发送和接收消息
        Message Queue(邮件) 为了提高性能和吞吐量,引入了 Message Queue,一个 Topic 可以设置一个或多个 Message Queue,这样消息就可以并行往各个Message Queue 发送消息,消费者也可以并行的从多个 Message Queue 读取消息 Message Message 是消息的载体。
        Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
        Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

 

java 消息发送和接收演示

        们使用 Java 代码来演示消息的发送和接收
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency
发送消息
        消息发送步骤:
                1. 创建消息生产者, 指定生产者所属的组名
                2. 指定 Nameserver 地址
                3. 启动生产者
                4. 创建消息对象,指定主题、标签和消息体
                5. 发送消息
                6. 关闭生产者
public class MQProducerTest {
public static void main(String[] args) throws Exception {
    //1. 创建消息生产者, 指定生产者所属的组名
    DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
    //2. 指定 Nameserver 地址
    producer.setNamesrvAddr("192.168.109.131:9876");
    //3. 启动生产者
    producer.start();
    //4. 创建消息对象,指定主题、标签和消息体
    Message msg = new Message("myTopic", "myTag",("RocketMQ Message").getBytes());
    //5. 发送消息
    SendResult sendResult = producer.send(msg, 10000);
    System.out.println(sendResult);
    //6. 关闭生产者
    producer.shutdown();
    }
}
接收消息
        消息接收步骤:
                1. 创建消息消费者, 指定消费者所属的组名
                2. 指定 Nameserver 地址
                3. 指定消费者订阅的主题和标签
                4. 设置回调函数,编写处理消息的方法
                5. 启动消息消费者
public class MQConsumerTest {
public static void main(String[] args) throws Exception {
    //1. 创建消息消费者, 指定消费者所属的组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");
    //2. 指定 Nameserver 地址
    consumer.setNamesrvAddr("192.168.109.131:9876");
    //3. 指定消费者订阅的主题和标签
    consumer.subscribe("myTopic", "*");
    //4. 设置回调函数,编写处理消息的方法
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {
            System.out.println("Receive New Messages: " + msgs);//返回消费状态
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
//5. 启动消息消费者
    consumer.start();
        System.out.println("Consumer Started.");
    }
}
案例
        接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

 

        订单微服务发送消息
        1. 添加 rocketmq 的依赖
<!--rocketmq-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
        2. 添加配置
rocketmq:
    name-server: 127.0.0.1:9876 #rocketMQ 服务的地址
    producer:
        group: shop-order # 生产者组
        3. 编写测试代码
        在订单微服务控制器中添加代码
@Autowired
private RocketMQTemplate rocketMQTemplate;
rocketMQTemplate.convertAndSend("order-topic", order);
        用户微服务接收消息
        1. 添加依赖
<!--rocketMQ-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
        2. 修改配置文件
rocketmq:
    name-server: 127.0.0.1:9876
        3. 编写消息接收服务
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
    System.out.println("收到一个订单信息:"+ JSON.toJSONString(order)+",接下来发送短信");
    }
}
        4. 启动服务,执行下单操作,观看后台输出

 

发送不同类型的消息
        RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送
        可靠同步发送: 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方 式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
        可靠异步发送: 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送 方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知 启动转码服务,转码完成后通知推送转码结果等。
        单向发送: 单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不 等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
发送同步消息
//同步消息
//参数一: topic
//参数二: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
System.out.println(sendResult);

发送异步消息

//参数一: topic
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new
SendCallback(){
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println(sendResult);
    }
    @Override
    public void onException(Throwable throwable) {
        System.out.println(throwable);
    }
});
//让线程不要终止
Thread.sleep(30000000);
单向消息
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");

Redis 实现分布式锁

什么是分布式锁?

        分布式锁,即分布式系统中的锁。在单体应用中我们通过 java 中的锁解决多线程访问共享资源 的问题,而分布式锁,就是解决了 分布式系统中控制共享资 源访问 的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程.
        下面是一个扣除库存的方法,思考在集群,分布式项目中会出现什么问题.

        思考: 添加 synchronized 锁,能够解决问题 

基于 Redis 分布式锁的实现方式

        分布式锁的特征

方式 1:SETNX 命令

        作为分布式锁实现过程中的共享存储系统,Redis 可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis 就能保存锁变量了,客户端也就可以通过 Redis 的命令操作来实现锁操作。
        想要实现分布式锁,必须要求 Redis 有互斥的能力。可以使用 SETNX 命令,其含义是 SET IF NOT EXIST,即如果 key 不存在,才会设置它的值,否则什么也不做。两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。
        以下展示了 Redis 使用 key/value 对保存锁变量,以及两个客户端同时请求加锁的操作过程。
        加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL 的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。直接使用 DEL 命令删除这个 key 即可。
        但是,以上实现存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成死锁
        1. 程序处理业务逻辑异常,没及时释放锁
        2. 进程挂了,没机会释放锁
        以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
        在 finally 中释放锁,以及设置键失效时间.
        但是设置失效时间 10s,有可能业务执行时间大于 10s,那么锁会失效,导致其他线程进入到减库存业务中,这时,第一个线程执行完成,会误删除第二个线程的锁标志,导致其他线程进入.导致锁永久失效.

        为每个线程的添加一个版本号,删除时,判断版本号. 

方式 2: 使用 redission

         导入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>
        创建 Redisson 对象
//创建 Redisson 对象
@Bean
public Redisson getRedisson(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://120.48.37.232:6379").setDatabase(0);
    return (Redisson)Redisson.create(config);
}
        使用 Redisson 实现加锁,释放锁.

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/4300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot实现医院信息管理系统【源码+论文】

基于springboot实现医院信管系统演示开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xf…

2023年全国DAMA-CDGP数据治理专家认证线上班招生简章

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

Vue3——v-md-editor(markDown编辑器)使用教程

Vue3——v-md-editor安装使用教程 安装 # 使用 npm npm i kangc/v-md-editor -SEditorMarkdown.vue页面用来封装此编辑器组件 Test.vue作为接受EditorMarkdown.vue的父组件&#xff0c;当测试页使用 路由部分要放入test.vue main.js部分全局引入组件 import EditorMarkdown f…

操作系统权限维持(十一)之Linux系统-SSH Wrapper后门

系列文章 操作系统权限维持&#xff08;一&#xff09;之Windows系统-粘贴键后门 操作系统权限维持&#xff08;二&#xff09;之Windows系统-克隆账号维持后门 操作系统权限维持&#xff08;三&#xff09;之Windows系统-启动项维持后门 操作系统权限维持&#xff08;四&…

算法强化每日一题--字符串中找出连续最长的数字串

hi,大家好,今天为大家带来一道题目 OR59 字符串中找出连续最长的数字串 描述 读入一个字符串str&#xff0c;输出字符串str中的连续最长的数字串 输入描述&#xff1a; 个测试输入包含1个测试用例&#xff0c;一个字符串str&#xff0c;长度不超过255。 输出描述&#xff1a; 在…

北邮22信通:(8)实验1 题目五:大整数加减法(搬运官方代码)

北邮22信通一枚~ 跟随课程进度每周更新数据结构与算法的代码和文章 持续关注作者 解锁更多邮苑信通专属代码~ 上一篇文章&#xff1a; 北邮22信通&#xff1a;&#xff08;7&#xff09;实验1 题目四&#xff1a;一元多项式&#xff08;节省内存版&#xff09;_青山如…

【人人都能读标准】17. 底层算法:ECMAScript的错误处理机制

本文为《人人都能读标准》—— ECMAScript篇的第17篇。我在这个仓库中系统地介绍了标准的阅读规则以及使用方式&#xff0c;并深入剖析了标准对JavaScript核心原理的描述。 我们在11.程序完整执行过程说过&#xff0c;一个程序的运行会经历三个阶段&#xff1a;初始化Realm环境…

MyBatis-面试题

文章目录1.什么是MyBatis?2.#{}和${}的区别是什么&#xff1f;3.MyBatis的一级、二级缓存4.MyBatis的优缺点5.当实体类中的属性名和表中的字段名不一样 &#xff0c;怎么办 &#xff1f;6.模糊查询like语句该怎么写?7.Mybatis是如何进行分页的&#xff1f;分页插件的原理是什…

渗透测试之冰蝎实战

渗透测试之冰蝎实战1.基本使用2.命令执行&虚拟终端3.文件管理4.反弹shell5.内网资产扫描6.内网穿透7.数据库管理“冰蝎”是一款动态二进制加密网站管理客户端 下载地址 1.基本使用 运行冰蝎&#xff0c;打开传输协议&#xff1a; 生成一个php远程马&#xff1a; 点击生成…

【测试基础】之07 linux基础

Linux操作系统Linux操作系统介绍操作系统&#xff1a;管理计算机硬件与软件 资源的计算机程序&#xff0c;同时也是计算机系统的内核与基石。简单地说&#xff0c;操作系统就是出于用户与计算机系统硬件之间用于传递信息的系统程序软件。例如&#xff1a;操作系统会在接收到用户…

金三银四,你准备好面试了吗? (附30w字软件测试面试题总结)

不知不觉&#xff0c;已是3月下旬。最近有很多小伙伴都在跟我谈论春招面试的问题&#xff0c;其实对于面试&#xff0c;我也没有太多的经验&#xff0c;只能默默地把之前整理的软件测试面试题分享给Ta。今天就来大致的梳理一下软件测试的面试体系&#xff08;每一部分最后都有相…

Vue3学习笔记(5.0)

Vue.js循环语句 v-for指令需要以site in sites形式的特殊语法&#xff0c;sites是源数据数组并且site是数组元素迭代的别名。 v-for可以绑定数据到数组来渲染一个列表&#xff1a; <!--* Author: RealRoad1083425287qq.com* Date: 2023-03-26 16:26:51* LastEditors: Mei…

图解redis的client的实现

目录 1.引言 2.客户端属性 2.1套接字描述符 2.2 name 2.3 客户端标志 2.4输入缓冲区 2.5命令与命令参数 2.6命令实现的函数 2.7输出缓冲区 2.8身份验证 2.9 时间 3.客户端的创建的关闭 3.1普通客户端的创建 3.2普通客户端的关闭 3.AOF的伪客户端 1.引言 Redis服务…

(数字图像处理MATLAB+Python)第二章数字图像处理基础-第三、四节:数字图像的生成和数值描述

文章目录一&#xff1a;数字图像的生成与表示&#xff08;1&#xff09;图像信号的数字化&#xff08;2&#xff09;数字图像类型二&#xff1a;数字图像的数值描述&#xff08;1&#xff09;常用坐标系&#xff08;2&#xff09;数字图像的数据结构&#xff08;3&#xff09;常…

Typora使用

Typora Typora 是一款支持实时预览的 Markdown 文本编辑器。 1. 基础操作 1.1标题 # 一级标题## 二级标题### 三级标题#### 四级标题##### 五级标题###### 六级标题1.2 引用 > 引用内容1 > 引用内容2 >> 引用内容31.3 斜体 *斜体* _斜体_1.4 加粗…

mysql整理

文章目录概述SQLDDLDMLDQL单表查询多表查询DQL的执行顺序DCL管理用户控制权限函数约束事务存储引擎索引概述语法性能分析索引的使用SQL的优化insert优化主键优化Order by优化其它优化存储对象视图存储过程基本操作变量IF条件判断参数循环条件处理程序存储函数触发器锁全局锁表级…

Mysql-缓冲池 buffer pool

缓冲池 buffer pool innodb中的数据是以【页】的形式存储在磁盘上的表空间内&#xff0c;但是【磁盘的速度】和【内存】相比简直不值一提&#xff0c;而【内存的速度】和【cpu的速度】同样不可同日而语&#xff0c;对于数据库而言&#xff0c;I/O成本永远是不可忽略的一项成本…

基于Elman神经网络预测计费系统的输出(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 简单循环网络&#xff08;simple recurrent networks&#xff0c;简称SRN&#xff09;又称为Elman network&#xff0c;是由Jeff…

什么是AI文章生成器-AI文章生成器批量生成文章

AI文章生成器有哪些 目前市面上存在一些可以生成文章的 AI 文章生成器&#xff0c;以下是其中几种常见的&#xff1a; OpenAI GPT-3&#xff1a; OpenAI GPT-3 是目前最先进、最著名的 AI 文章生成器之一&#xff0c;它可以生成各种类型的文章&#xff0c;例如新闻报道、科学报…

我的Macbook pro使用体验

刚拿到Mac那一刻&#xff0c;第一眼很惊艳&#xff0c;不经眼前一亮&#xff0c;心想&#xff1a;这是一件艺术品&#xff0c;太好看了吧 而后再体验全新的Macos 系统&#xff0c;身为多年的win用户说实话一时间还是难以接受 1.从未见过的访达&#xff0c;不习惯的右键 2. …
最新文章