基于IPC-CFX的点对点通信C#

        IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。

        在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。

        通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。

         我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:

using System.Threading;
using CFX;
using CFX.Transport;
using System;
using System.Security.Principal;

namespace machine1
{
    class Program
    {

        static void Main(string[] args)
        {

            OpenRequest();
            Console.ReadLine();
            for(int i = 0;i<5;i++){
                SendRequest();
                Thread.Sleep(2000);
            }
            
        }

        static string sendCFXHandle = "a.b.001";     
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");


        #region send request

        static AmqpCFXEndpoint endpointSendRequest;

        static void OpenRequest()
        {


            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;

            }

            endpointSendRequest = new AmqpCFXEndpoint();

            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(sendCFXHandle);    //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值
                Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());

            }

            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);

        }
        static void SendRequest()
        {

            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;

            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        #endregion send request


        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;

        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }

            endpointReceiveRequest = new AmqpCFXEndpoint();

            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;

            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }


        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");

            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            return null;
        }

        #endregion receive request




    }
}

        作为接收端,machine2的代码如下所示:

using CFX;
using CFX.Transport;
using System;

namespace machine2
{
    class Program
    {


        static void Main(string[] args)
        {

            Console.WriteLine("ReceivEndPoint is waiting Request......");
            OpenReceive();
            Console.WriteLine("Press Enter Key to end the App");
            Console.ReadKey();

        }

        static string sendCFXHandle = "a.b.001";
        static string receiveCFXHandle = "a.b.002";
        static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
        static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");


        #region send request

        static AmqpCFXEndpoint endpointSendRequest;

        static void OpenRequest()
        {


            if (endpointSendRequest != null)
            {
                endpointSendRequest.Close();
                endpointSendRequest = null;

            }

            endpointSendRequest = new AmqpCFXEndpoint();

            if (!endpointSendRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
                endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());

            }

            // Set a timeout of 20 seconds.  If the target endpoint does not
            // respond in this time, the request will time out.
            AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);

        }
        static void SendRequest()
        {

            // Build a GetEndpointInfomation Request
            CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
            {
                CFXHandle = receiveCFXHandle
            });
            request.Source = endpointSendRequest.CFXHandle;
            request.Target = receiveCFXHandle;

            try
            {
                CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
                Console.WriteLine($"response:\n{response.ToJson(true)}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        #endregion send request


        #region receive request
        static AmqpCFXEndpoint endpointReceiveRequest;

        static void OpenReceive()
        {
            if (endpointReceiveRequest != null)
            {
                endpointReceiveRequest.Close();
                endpointReceiveRequest = null;
            }

            endpointReceiveRequest = new AmqpCFXEndpoint();

            endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
            endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;

            if (!endpointReceiveRequest.IsOpen)
            {
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
                endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
                //endpointReceiveRequest.Open(receiveCFXHandle);
                Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
            }
        }


        static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
        {
            Console.WriteLine($"Endpoint_OnRequestReceived:  { request.ToString()}");
            Console.WriteLine($"request:\n{request.ToJson(true)}");

            // Process request.  Return Result.
            if (request.MessageBody is WhoIsThereRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            if (request.MessageBody is GetEndpointInformationRequest)
            {
                CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
                { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });
                result.Source = receiveCFXHandle;
                result.Target = request.Source;
                result.TimeStamp = DateTime.Now;
                return result;
            }

            return null;
        }

        #endregion receive request




    }
}

         运行结果,可以用json格式对response和request的内容进行解析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/41269.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何实现CAN-SOME/IP通信路由测试

区别于基于UDP的车内通信路由&#xff0c;基于SOME/IP协议的路由增加了服务发现&#xff08;Service Discovery&#xff09;和服务发布&#xff08;Service Publish&#xff09;&#xff0c;那对于测试工程师来说&#xff0c;怎么实现CAN-SOME/IP路由的测试呢&#xff1f; 01 …

网络安全面试题

以下为网络安全各个方向涉及的面试题&#xff0c;星数越多代表问题出现的几率越大&#xff0c;祝各位都能找到满意的工作。 注&#xff1a;本套面试题&#xff0c;已整理成pdf文档&#xff0c;但内容还在持续更新中&#xff0c;因为无论如何都不可能覆盖所有的面试问题&#xf…

45、Spring Boot自动配置原理

Spring Boot自动配置原理 lmport Configuration Spring spi 自动配置类由各个starter提供&#xff0c;使用Configuration Bean定义配置类&#xff0c;放到META-INF/spring.factories下使用Spring spi扫描META-INF/spring.factories下的配置类使用lmport导入自动配置类

C# WPF实现动画渐入暗黑明亮主题切换效果

C# WPF实现动画渐入暗黑明亮主题切换效果 效果图如下最近在Bilibili的桌面端看到一个黑白主题切换的效果感觉&#xff0c;挺有意思。于是我使用WPF尝试实现该效果。 主要的切换效果&#xff0c;基本实现不过还存在一些小瑕疵&#xff0c;比如字体等笔刷不能跟随动画进入进行切…

手写数字识别Minst(CNN)

文章目录 手写数字识别网络结构加载数据集数据集可视化CNN网络结构训练模型保存模型和加载模型测试模型 手写数字识别 网络结构 网上给出的基本网络结构&#xff1a; 然而在本数据集中&#xff0c;输入图不是1*32*32&#xff0c;是1*28*28。所以正确的网络结构应该是 level…

好用的思维导图软件有哪些?这几款简单好用

好用的思维导图软件有哪些&#xff1f;思维导图是一种非常有用的思维工具&#xff0c;可以帮助我们组织和理清复杂的信息。在如今的数字时代&#xff0c;有很多软件可以帮助我们创建和编辑思维导图。下面介绍几款简单好用的思维导图软件。 第一款&#xff1a;迅捷画图 这是一款…

生成式AI时代,亚马逊云科技致力推动技术的普惠,让更多企业受益

当谈及AIGC时&#xff0c; 我们该谈些什么&#xff1f; 生成式AI技术与应用的不断发展&#xff0c;为各个行业都注入了全新的机会与活力。AIGC成为了今年最为激动人心的技术话题。亚马逊云科技也一马当先&#xff0c;在6月27-28日&#xff0c;2023亚马逊云科技中国峰会上分享…

babel兼容低版本游览器

文章目录 1. webpack项目的搭建2. babel 命令行使用3. babel的预设与编译器流程4. babel项目中配置4.1 babel-loader与插件的使用4.2 babel-preset使用 5. 游览器兼容性使用5.1 browserslist工具与编写规则5.2 browserslist配置5.3 优化babel的配置文件 6. polyfill6.1 useBuil…

Redis基础 进阶项目实战总结笔记

文章目录 一、启动的三种方式1.默认启动2.指定配置启动3.开机自启动 二、数据类型1.string&#xff1a;字符串2. hash&#xff1a;哈希3. list&#xff1a;列表4. set&#xff1a;集合5. sorted set&#xff1a;有序集合 三、黑马课程的进阶项目实战总结博文笔记Redis实现短信登…

配置spark

配置spark Yarn 模式Standalone 模式Local 模式 Yarn 模式 tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-yarn修改 hadoop 配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发 <!--是否启动一…

Vue3统计数值(Statistic)

可自定义设置以下属性&#xff1a; 数值的标题&#xff08;title&#xff09;&#xff0c;类型&#xff1a;string | slot&#xff0c;默认&#xff1a;‘’数值的内容&#xff08;value&#xff09;&#xff0c;类型&#xff1a;string | number&#xff0c;默认&#xff1a;…

【Python】数据可视化利器PyCharts在测试工作中的应用

点击跳转原文&#xff1a;【Python】数据可视化利器PyCharts在测试工作中的应用 实际应用&#xff1a;常态化性能压测数据统计 import random from pyecharts.charts import Line, Bar, Grid, Pie, Page from pyecharts import options as opts # 查询过去 8 次数据 time_rang…

Low-Light Image Enhancement via Self-Reinforced Retinex Projection Model 论文阅读笔记

这是马龙博士2022年在TMM期刊发表的基于改进的retinex方法去做暗图增强&#xff08;非深度学习&#xff09;的一篇论文 文章用一张图展示了其动机&#xff0c;第一行是估计的亮度层&#xff0c;第二列是通常的retinex方法会对估计的亮度层进行RTV约束优化&#xff0c;从而产生…

NFTScan 与 Decert 达成合作伙伴,双方在 NFT 数据方面展开合作

近日&#xff0c;NFT 数据基础设施 NFTScan 与 Decert 达成合作伙伴关系&#xff0c;双方在多链 NFT 数据层面展开合作。在 Decert 产品中&#xff0c;由 NFTScan 为其提供专业的多链 NFT 数据支持&#xff0c;为用户带来优质的 NFT 搜索查询等相关交互功能&#xff0c;提升用户…

事务@transactional执行产生重复数据

背景 系统设计之初&#xff0c;每次来新请求&#xff0c;业务层会先查询数据库&#xff0c;判断是否存在相同的id数据&#xff08;id是唯一标识产品的&#xff09;&#xff0c;有则返回当前数据库查到的数据&#xff0c;根据数据决定下一步动作&#xff0c;没有则认为是初次请…

一则 MySQL 参数设置不当导致复制中断的故障案例

本文分享了一个数据库参数错误配置导致复制中断的问题&#xff0c;以及对参数配置的建议。 作者&#xff1a;秦福朗 爱可生 DBA 团队成员&#xff0c;负责项目日常问题处理及公司平台问题排查。热爱互联网&#xff0c;会摄影、懂厨艺&#xff0c;不会厨艺的 DBA 不是好司机&…

wsl2中安装docker

1、安装docker 执行以下脚本&#xff1a; 这个脚本在执行之前需要先执行chmod x install-docker.sh这个命令 # install docker curl -fsSL get.docker.com -o get-docker.sh sh get-docker.shif [ ! $(getent group docker) ]; thensudo groupadd docker; elseecho "doc…

UE4/5AI制作基础AI跳跃(适合新手)

目录 制作 添加逻辑 添加导航链接代理 结果 在上一章中&#xff0c;我们讲解了简单的AI跟随玩家&#xff0c;制作了一个基础的ai。 UE4/5AI制作基础AI&#xff08;适合新手入门&#xff0c;运用黑板&#xff0c;行为树&#xff0c;ai控制器&#xff0c;角色类&#xff0c;任…

数据库应用:CentOS 7离线安装PostgreSQL

目录 一、理论 1.PostgreSQL 2.PostgreSQL离线安装 3.PostgreSQL初始化 4.PostgreSQL登录操作 二、实验 1.CentOS 7离线安装PostgreSQL 2.登录PostgreSQL 3.Navicat连接PostgreSQL 三、总结 一、理论 1.PostgreSQL &#xff08;1&#xff09;简介 PostgreSQL 是一个…

性能测试工具 jmeter 录制脚本,传递 cookie,循环执行接口

目录 前言&#xff1a; 代理录制脚本 循环重复添加接口 登录并传递 cookie 给新建产品接口 循环执行脚本 前言&#xff1a; 在使用JMeter进行性能测试时&#xff0c;录制脚本是一种常用的方法。录制脚本可以帮助你捕获和重放用户与应用程序之间的交互&#xff0c;以模拟真…