.netcore grpc双向流方法详解

一、双向流处理概述

  1. 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
  2. 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。 使用 ResponseStream.MoveNext() 或 ResponseStream.ReadAllAsync() 可访问从服务流式处理的消息。ResponseStream 没有更多消息时,双向流式处理调用完成。

二、案例简介

  1. 客户端发送请求流通过equestStream.WriteAsync传入到服务端
  2. 服务端响应到客户端的流通过ResponseStream.WriteAsync写入到客户端
  3. 服务端使用System.Threading.Channels保证线程安全交互

三、服务端配置(注意:grpc相关配置参考我之前的文章)

  1. 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法

//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;

syntax = "proto3";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;


// 消息推送/接收实体
message ExampleMessage
{
	string msg = 1;
}


// 双向流文件twowaystream.proto

syntax = "proto3";

import "Protos/messages.proto";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;

service BothWaysRpc{
	// 双向流
	rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}
  1. 1 服务接口实现
    /// <summary>
    /// 双向流服务
    /// </summary>
    public class BothWaysService : BothWaysRpc.BothWaysRpcBase
    {
        /// <summary>
        /// 自动重置事件
        /// </summary>
        private readonly ManualResetEventSlim _event;
        public BothWaysService()
        {
            _event = new ManualResetEventSlim(false);
        }

        public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,
                                               IServerStreamWriter<ExampleMessage> responseStream,
                                               ServerCallContext context)
        {

            // 创建线程安全的有限容量通道
            var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));

            var task = Task.Run(async () =>
            {
                await foreach (var message in requestStream.ReadAllAsync())
                {
                    // 读取消息 写入通道
                    if (!string.IsNullOrWhiteSpace(message.Msg))
                    {
                        await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");
                        // todo 消息处理
                        await channel.Writer.WriteAsync(message, context.CancellationToken);
                    }
                }
            }, context.CancellationToken);


            await foreach (var message in channel.Reader.ReadAllAsync())
            {
                // 打印通道接收的消息
                await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");

                // 写入响应流
                ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };

                await responseStream.WriteAsync(exampleMessage);

                if (message.Msg.ToLower() == "exit")
                {
                    break;
                }
            }

            // 完结写入通道
            channel.Writer.Complete();
            await task;
        }
    }
  1. 2 Program注入
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);
            builder.Services.AddGrpc();
            var app = builder.Build();
            // 一元方法
            //app.MapGrpcService<DollarService>();
            // 客户端流
            //app.MapGrpcService<ClientStreamService>();
            // 服务端流
            //app.MapGrpcService<ServerStreamService>();
            // 双向流
            app.MapGrpcService<BothWaysService>();
            app.Run();
        }
    }

四、客户端配置

  1. 引用proto文件,配置为客户端类型
  2. 根据编译生成的函数进行传参调用
  3. 创建WPF测试客户端

button按钮触发grpc

    /// <summary>
    /// BothWaysClient.xaml 的交互逻辑
    /// </summary>
    public partial class BothWaysClient : Window
    {
        public BothWaysClient()
        {
            InitializeComponent();
        }

        private async void Excute_Click(object sender, RoutedEventArgs e)
        {
            Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };

            await WpfClient.Show(action);


            txtValue.Text += "\r\n\r\n";
        }
    }

grpc客户端接口调用

        /// <summary>
        /// 双向流
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public static async Task Show(Action<string> action)
        {
            var messages = new List<string>()
            {
                "test",
                "one",
                "two",
                "three",
                "false",
                "four",
                "Oooo",
                "dddd",
                "vvvfff",
                "exit"
            };


            Random rnd = new Random(20);


            var channel = GrpcChannel.ForAddress("https://localhost:7188");

            var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);

            var bothWays = client.StreamingBothWays();

            var requestTask = Task.Run(async () =>
              {
                  while (true)
                  {
                      var index = rnd.Next(messages.Count);
                      var msg = messages[index];
                      await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });
                      if (msg == "exit")
                      {
                          break;
                      }
                  }
              });
            await foreach (var item in bothWays.ResponseStream.ReadAllAsync())
            {
                action(item.Msg);
                if (item.Msg == "我已经接收到消息:exit")
                {
                    break;
                }
            }

            await requestTask;
        }

五、执行结果

服务端:

 客户端:

 六、源码地址

链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ 
提取码:sd4y

七、后续进阶简介

  1. 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
  2. proto文件各个字段详细介绍
  3. token认证
  4. 截止时间(中止请求)和请求取消
  5. AOP切面策略
  6. 重试策略(policy)
  7. 负载均衡策略(grpc本身提供的策略及nginx代理)
  8. 日志记录
  9. 健康检查
  10. 后续有更多特色功能会持续补充

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/79177.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Golang 基础语法问答

使用值为 nil 的 slice、map 会发生什么&#xff1f; 允许对值为 nil 的 slice 添加元素&#xff0c;但是对值为 nil 的 map 添加元素时会造成运行时 panic。 // map错误示例 func main() {var m map[string]intm["one"] 1 // error: panic: assignment to entry …

(5)所有角色数据分析页面的构建-5

所有角色数据分析页面&#xff0c;包括一个时间轴柱状图、六个散点图、六个柱状图(每个属性角色的生命值/防御力/攻击力的max与min的对比)。 """绘图""" from pyecharts.charts import Timeline from find_type import FindType import pandas …

特殊链表(循环单链表,循环双链表,静态链表)

目录 1.循环单链表的初始化 2.循环双链表 3. 静态链表 (1)静态链表的初始化 (2)静态链表的插入 1.循环单链表的初始化 typedef int ElemType; typedef struct LNode{ElemType data;struct LNode *next;}LNode,*LinkList;bool InitList(LinkList &L) {L(LNode*)malloc(…

微服务基础概念【内含图解】

目录 拓展补充&#xff1a; 单体架构 分布式架构 面向服务的体系结构 云原生 微服务架构 什么是微服务&#xff1f; 微服务定义 拓展补充&#xff1a; 单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;最终打成一个包部署 优点&#x…

Zabbix-6.4.4 邮箱告警SMS告警配置

目录 ​------------------------- # 邮箱告警 ---------------------------------- 1.安装mailx与postfix软件包 2.修改mailx配置文件 3. 创建文件夹 4. 编写mail-send.sh脚本 5. 将该脚本赋予执行权限 6. 进入web界面进行设置—> Alerts —> Media Types 7. 添…

uniapp小程序实现上传图片功能,并显示上传进度

效果图&#xff1a; 实现方法&#xff1a; 一、通过uni.chooseMedia(OBJECT)方法&#xff0c;拍摄或从手机相册中选择图片或视频。 官方文档链接: https://uniapp.dcloud.net.cn/api/media/video.html#choosemedia uni.chooseMedia({count: 9,mediaType: [image,video],so…

Apache-DBUtils

目录 封装方法 引出dbutils 案例 当关闭connection后&#xff0c;resultset结果集就无法使用了&#xff0c;这就使得resultset不利于数据的管理 封装方法 我们可以将结果集先存储在一个集合中&#xff0c;当connection关闭后&#xff0c;我们可以通过访问集合来访问结果集 …

7.11 Java方法重写

7.11 Java方法重写 这里首先要确定的是重写跟属性没有关系&#xff0c;重写都是方法的重写&#xff0c;与属性无关 带有关键字Static修饰的方法的重写实例 父类实例 package com.baidu.www.oop.demo05;public class B {public static void test(){System.out.println("这…

C++--红黑树

1.什么是红黑树 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路径会比其他路径长出俩倍&#xff0c;因…

【Java转Go】快速上手学习笔记(二)之基础篇一

目录 创建项目数据类型变量常量类型转换计数器键盘交互流程控制代码运算符 创建项目 上篇我们安装好了Go环境&#xff0c;和用IDEA安装Go插件来开发Go项目&#xff1a;【Java转Go】快速上手学习笔记&#xff08;一&#xff09;之环境安装篇 。 这篇我们开始正式学习Go语言。我…

【新知测评实验室】解谜扫描全能王——“智能高清滤镜”黑科技

目录 一、“智能高清滤镜” 原理分析1.1、智能扫描引擎AI-Scan功能拆解1.1.1、**图像感知**1.1.2、场景化决策 1.2、版面还原与识别技术分析1.2.1、元素检测和识别1.2.2、元素聚合1.2.3、版面识别 二、深度测评——“智能高清滤镜”功能2.1、图像处理方面2.2、摩尔纹去除方面2.…

【C++类和对象】类有哪些默认成员函数呢?(下)

文章目录 一、类的6个默认成员函数二、日期类的实现2.1 运算符重载部分2.2 日期之间的运算2.3 整体代码1.Date.h部分2. Date.cpp部分 三. const成员函数四. 取地址及const取地址操作符重载扩展内容 总结 ヾ(๑╹◡╹)&#xff89;" 人总要为过去的懒惰而付出代价ヾ(๑╹◡…

ffmepg滤镜

代码实现&#xff1a; 1.get_format() 这个是QSV硬件解码时的回调函数&#xff0c;在这里初始化hw_frames_ctx, get_format会在解码时被调用。因此对滤镜的初始化init_filter()应在得到第一帧数据后调用。 2.hw_frames_ctx&#xff0c;需要按照要求把他们传给对应的filter 3.然…

向量数据库 Milvus Cloud Partition Key:租户数量多,单个租户数据少的三种解决方案

三种解决方案 这个问题提出的时候,Milvus 的最新版本是 2.2.8,我们做个角色互换,在当时站在这个用户的角度,留在我们面前的选择有这么几个: 为每个租户创建一个 collection 为每个租户创建一个 partition 创建一个租户名称的标量字段 接下来,我们依次分析下这三种方案的可…

OPPO A57刷机资源(附简略教程)

https://www.123pan.com/s/hcAqVv-fpHWd.html提取码:buAp 图中画框的为必须下载的&#xff08;xiaomiflash和twrp必须解压后使用&#xff09; ​ 打开xiaomiflash点击driver点击install&#xff08;就是框住的按钮&#xff09;等待安装完成 ​用数据线将oppo a57与电脑连接&a…

【数学建模】逻辑回归算法(Logistic Resgression)

逻辑回归算法 简介逻辑回归与条件概率绘制sigmoid函数 简介 逻辑回归算法是一种简单但功能强大的二元线性分类算法。需要注意的是&#xff0c;尽管"逻辑回归"名字带有“回归”二字&#xff0c;但逻辑回归是一个分类算法&#xff0c;而不是回归算法。 我认为&#xff…

用easyui DataGrid编辑树形资料

easyui显示编辑树形资料有TreeGrid元件&#xff0c;但是这个元件的vue版本和react版本没有分页功能。virtual scroll功能也表现不佳。 我用DataGrid来处理。要解决的问题点&#xff1a; &#xff08;1&#xff09;如何显示成树形。即&#xff0c;子节点如何有缩进。 先计算好…

一、进入sql环境,以及sql的查询、新建、删除、使用

1、进入sql环境 》》》mysql -u root -p 》》》输入密码 2、sql语言的分类 3、注意事项&#xff1a; 4、基础操作&#xff1a; &#xff08;1&#xff09;查询所有数据库&#xff1a; show databases; 运行结果&#xff1a; &#xff08;2&#xff09;创建一个新的数据库&…

网络通信原理传输层TCP三次建立连接(第四十八课)

ACK :确认号 。 是期望收到对方的下一个报文段的数据的第1个字节的序号,即上次已成功接收到的数据字节序号加1。只有ACK标识为1,此字段有效。确认号X+1SEQ:序号字段。 TCP链接中传输的数据流中每个字节都编上一个序号。序号字段的值指的是本报文段所发送的数据的第一个字节的…

MongoDB 简介

什么是MongoDB ? MongoDB 是由C语言编写的&#xff0c;是一个基于分布式文件存储的开源数据库系统。 在高负载的情况下&#xff0c;添加更多的节点&#xff0c;可以保证服务器性能。 MongoDB 旨在为WEB应用提供可扩展的高性能数据存储解决方案。 MongoDB 将数据存储为一个…