Netty核心技术十一--用Netty 自己 实现 dubbo RPC

1. RPC基本介绍

  1. RPC(Remote Procedure Call):远程 过程调用,是一个计算机 通信协议。该协议允许运 行于一台计算机的程序调 用另一台计算机的子程序, 而程序员无需额外地为这 个交互作用编程

  2. 两个或多个应用程序都分 布在不同的服务器上,它 们之间的调用都像是本地 方法调用一样(如图)

    image-20230715122856569

  3. 常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

2. RPC调用流程图

术语说明:在RPC中,

  • Client叫服务消费者
  • Server叫服务提供者

image-20230715123024750

3. PRC调用流程说明

  1. **服务消费方(client)**以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

4. 自己实现 dubbo RPC(基于Netty)

  • 需求说明

    1. dubbo 底层使用了 Netty 作为网络通讯框架,要求用Netty 实现一个简单的RPC框架
    2. 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty4.1.20
  • 设计说明

    image-20230715123505151

    1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
    2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
    3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据

4.1 公共接口 publicinterface包

4.1.1 HelloService

package site.zhourui.nioAndNetty.netty.dubborpc.publicinterface;

//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {
    String hello(String msg);
}

4.2 远程调用netty包

本质上就是客户端访问服务端

4.2.1 NettyClientHandler

  1. 我们实现了Callable方法
  2. setPara(String para)方法: 设置要发给服务器端的信息
  3. 我们将ctx在channelActive时抽取为全局对象context,方便我们在其他方法也能使用(这里就是call方法)
  4. call方法:
    • 开启子线程向服务端发送消息
    • 发送完成后该子线程进行wait,等待服务提供方处理并返回数据(被唤醒)
    • 唤醒后打印服务端返回数据全局变量result中的数据
  5. channelRead方法:
    • 收到服务器的返回数据后,将返回数据放在全局变量result中
    • 唤醒等待的线程
    • 因为channelRead和call方法是有同步关系的所有要加上synchronized加锁
  6. 小结: 代码执行流程
    1. channelActive()
    2. setPara()设置需要发送的数据
    3. call(wait之前代码)被代理对象调用, 发送数据给服务器-> wait
    4. 等待被唤醒(channelRead)->notify
    5. call(wait之后代码)
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数

    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,调用方法 (4)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return  result; //服务方返回的结果
    }

    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

4.2.2 NettyClient

说明:

  1. 创建线程池executor

  2. initClient():

    • 初始化NettyClientHandler 设为全局对象client
    • 创建客户端并连接客户端
      • StringDecoder():字符串编码器
      • StringEncoder():字符串解码器
      • pipeline.addLast(client):将加入自定义handler-client
  3. 编写方法getBean使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) 
    
    • serivceClass: 需要代理的Class对象
    • providerName: 协议以及需要发送的数据
    • 如果client为空就初始化initClient
    • client.setPara():使用自定义handler的全局对象client设置需要发送的数据
    • executor.submit(client): 将我们的自定义handler提交给异步线程池,因为NettyClientHandler 实现了Callable方法,会自动调用call方法
      • .get():异步任务执行完成后获取返回结果
    • 将返回结果return
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {
    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //编写方法使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }

                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.2.3 NettyServerHandler

  • 当通道发生读事件时
    • 获取客户端发送的消息,并调用服务
    • 按照协议规则取出数据(HelloService#hello#)
      • HelloService# 为协议头
      • hello为数据
    • 回复客户端调用结果
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import site.zhourui.nioAndNetty.netty.dubborpc.customer.ClientBootstrap;
import site.zhourui.nioAndNetty.netty.dubborpc.provider.HelloServiceImpl;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

4.2.4 NettyServer

  • 启动客户端
    • StringDecoder
    • StringEncoder
    • NettyServerHandler
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {
    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //编写一个方法,完成对NettyServer的初始化和启动

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

4.3 customer 包

4.3.1 ClientBootstrap

  • 设置providerName:我们发送的数据(协议+数据)
  • 创建一个消费者
  • 创建代理对象
  • 通过代理对象调用服务提供者的方法(服务)
package site.zhourui.nioAndNetty.netty.dubborpc.customer;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyClient;
import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {
    //这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //创建一个消费者
        NettyClient customer = new NettyClient();

        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}

4.4 provider 包

4.4.1 HelloServiceImpl

服务端提供方的实现,远程真正被调用的方法

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String msg) {
        System.out.println("收到客户端消息=" + msg);
        //根据mes 返回不同的结果
        if(msg != null) {
            return "你好客户端, 我已经收到你的消息 [" + msg + "] 第" + (++count) + " 次";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}

4.4.1 ServerBootstrap

ServerBootstrap 会启动一个服务提供者,就是 NettyServer

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyServer;

//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

4.5 测试

  1. 启动ServerBootstrap

    image-20230715153549514

  2. 启动ClientBootstrap

    image-20230715153625019

    image-20230715153633552

4.5.1 debug看一下ClientBootstrap启动

首先还是先启动服务端ServerBootstrap

  1. debug启动ClientBootstrap

    image-20230715153928061

  2. NettyClient(),此时只是初始化了全局属性

  3. getBean:创建代理对象

    • 先看看入参是什么数据

      image-20230715154250353

    • 如果client没有被初始化就初始化

      image-20230715154338794

    • 设置要发给服务器端的信息

      image-20230715154437686

    • executor.submit:提交异步任务就会来到NettyClientHandler的call方法

      image-20230715154604732

    • call方法执行到wait()方法后,channelRead不久后就会收到服务端的调用结果然后唤醒call方法的子线程继续执行

      image-20230715154847317

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/41327.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【已解决】html元素如何使字体占据相同的元素显得整齐

本博文源于自身的亲身实践&#xff0c;让html的文本元素对齐&#xff0c;如果不让其对齐就会变得很丑陋&#xff0c;如下图&#xff0c;那么如何设置才能让元素占据相同呢&#xff1f; 文章目录 1、问题来源2、问题解决思路3、问题解决方案4、问题完整源码及效果 1、问题来源 …

摩尔投票算法(Moore‘s Voting Algorithm)及例题

摩尔投票算法&#xff08;Moores Voting Algorithm&#xff09;及例题 摩尔投票算法简介摩尔投票算法算法思想摩尔投票算法经典题目169. 多数元素229. 多数元素 II6927. 合法分割的最小下标 上午打力扣第 354 场周赛最后十五分钟用摩尔投票算法直接秒了第三题。 摩尔投票算法简…

使用原生Redis命令实现分布式锁

推荐文章&#xff1a; 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; ​ 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、java后端接口API性能优化技巧 4、SpringBootMyBatis流式查询,处理大规模数据,提高系统的性能和响应…

一零六四、世界杯数据可视化分析(阿里云天池赛)

目录 赛制官方链接 活动背景 活动时间&#xff1a;即日起-12月31日17点 数据说明 世界杯成绩信息表&#xff1a;WorldCupsSummary 世界杯比赛比分汇总表&#xff1a;WorldCupMatches.csv 世界杯球员信息表&#xff1a;WorldCupPlayers.csv 代码实现 赛制官方链接 世界杯…

Git 学习笔记

Git 仓库中的提交记录保存的是你的目录下所有文件的快照&#xff0c;就像是把整个目录复制&#xff0c;然后再粘贴一样&#xff0c;但比复制粘贴优雅许多&#xff01; Git 希望提交记录尽可能地轻量&#xff0c;因此在你每次进行提交时&#xff0c;它并不会盲目地复制整个目录。…

使用typora+PicGo+Gitee简单实现图片上传功能

本文通过配置PicGoGitee来实现typora图片上传功能&#xff0c;系统是window 注意下载的清单有&#xff1a;PicGo&#xff0c;node.js&#xff0c;配置有&#xff1a;PicGo&#xff0c;node.js&#xff0c;gitee&#xff0c;typora 看着复杂实际上并不难&#xff0c;只是繁琐&am…

ADC 的初识

ADC介绍 Q: ADC是什么&#xff1f; A: 全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 ADC的性能指标 量程&#xff1a;能测量的电压范围分辨率&#xff1a;ADC能辨别的最小模拟量&#xff0c;通常以输出二进制数的位数表示&#xff0c;比如&am…

HttpClient使用MultipartEntityBuilder上传文件时乱码问题解决

HttpClient使用MultipartEntityBuilder是常用的上传文件的组件&#xff0c;但是上传的文件名称是乱码&#xff0c;一直输出一堆的问号&#xff1a; 如何解决呢&#xff1f;废话少说&#xff0c;先直接上代码&#xff1a; public static String doPostWithFiles(HttpClient http…

scripy其他

持久化 # 爬回来&#xff0c;解析完了&#xff0c;想存储&#xff0c;有两种方案 ## 方案一&#xff1a;一般不用 parse必须有return值&#xff0c;必须是列表套字典形式--->使用命令&#xff0c;可以保存到json格式中&#xff0c;csv中scrapy crawl cnblogs -o cnbogs.j…

【精华】maven 生命周期 + 依赖传递+ scope【依赖范围】 + 排除依赖 可选依赖

目录 一 . lifecycle 生命周期 二. 依赖 与 依赖传递 三. scope 依赖范围 scope指定依赖范围 依赖传递依赖与原依赖冲突 四 maven的可选依赖与排除依赖 可选依赖 全部 排除依赖 显式的指定 maven官网技术文档&#xff1a; 一 . lifecycle 生命周期 * clean&…

基于appium的常用元素定位方法

目录 一、元素定位工具 1.uiautomatorviewer.bat 2.appium检查器 二、常用元素定位方法 1.id定位 2.class_name定位 3.accessibility_id定位 4.android_uiautomator定位 5.xpath定位 三、组合定位 四、父子定位 五、兄弟定位 总结&#xff1a; 一、元素定位工具 app应…

postgresql regular lock常规锁申请与释放 内幕 以及fastpath快速申请优化的取舍

​专栏内容&#xff1a; postgresql内核源码分析 手写数据库toadb 并发编程 个人主页&#xff1a;我的主页 座右铭&#xff1a;天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物. 定义 每种常规锁都需要定义几个要素&#xff0c;它由结构体 Lo…

边缘检测之loG算子

note // 边缘检测之loG算子&#xff1a;对高斯函数求二阶导数 // G(x,y) exp(-1 * (x*x y*y) / 2 / sigma / sigma) // loG(x,y) ((x*x y*y - 2 * sigma * sigma) / (sigma^4)) * exp(-1 * (x*x y*y) / 2 / sigma /sigma) /* [ 0,0,-1,0,0; 0,-1,-2,-1,0; -1,-2,16,-2…

(栈队列堆) 剑指 Offer 09. 用两个栈实现队列 ——【Leetcode每日一题】

❓ 剑指 Offer 09. 用两个栈实现队列 难度&#xff1a;简单 用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数 appendTail 和 deleteHead &#xff0c;分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素&#xff0c;deleteHead …

Shikra:新一代多模态大语言模型,理解指向,说出坐标

“ Shikra&#xff1a;解锁多模态语言模型参考对话的魔法” Shikra和用户的对话案例 在人类的日常交流中&#xff0c;经常会关注场景中的不同区域或物体&#xff0c;双方都可以通过说话并指向这些区域来进行高效的信息交换。我们将这种对话模式称为参考对话&#xff08;Referen…

C语言 替换gets函数

目录 替换gets函数gets()用处gets()的危险之处gets()的几种替代方法一、用%c循环输入直到遇到换行结束二、用getchar()循环输入直到遇到换行结束三、scanf的另一种用法四、c中的getline()方法五、解决方案使用fgets代替 替换gets函数 gets()用处 gets从标准输入设备读字符串函…

C# Linq 详解四

目录 概述 二十、SelectMany 二十一、Aggregate 二十二、DistinctBy 二十三、Reverse 二十四、SequenceEqual 二十五、Zip 二十六、SkipWhile 二十七、TakeWhile C# Linq 详解一 1.Where 2.Select 3.GroupBy 4.First / FirstOrDefault 5.Last / LastOrDefault C# Li…

truffle 进行智能合约测试

本方法使用了可视化软件Ganache 前两步与不使用可视化工具的步骤是一样的&#xff08;有道云笔记&#xff09;&#xff0c;到第三步的时候需要注意&#xff1a; 在truffle插件下找到networks目录&#xff0c;提前打开Ganache软件 在Ganache中选择连接或者新建&#xff0c;我在…

软件测试测试用例

等价类&#xff1a;把输入的数据可以分为有效的数据和无效的数据 被测试的对象输入的数据&#xff1a; 1、有效的数据 2、无效的数据 测试一个产品&#xff0c;需要考虑它的正确场景&#xff0c;也需要考虑它的异常场景 边界值:边界值测试用例是针对等价类测试用例方法的补…

每天一道C语言编程:排队买票

题目描述 有M个小孩到公园玩&#xff0c;门票是1元。其中N个小孩带的钱为1元&#xff0c;K个小孩带的钱为2元。售票员没有零钱&#xff0c;问这些小孩共有多少种排队方法&#xff0c;使得售票员总能找得开零钱。注意&#xff1a;两个拿一元零钱的小孩&#xff0c;他们的位置互…
最新文章