rust tokio select!宏详解

rust tokio select!宏详解

简介

本文介绍Tokioselect!的用法,重点是使用过程中可能遇到的问题,比如阻塞问题、优先级问题、cancel safe问题。在Tokio 中,select! 是一个宏,用于同时等待多个异步任务,并在其中任意一个任务完成时执行相应的逻辑。

基本用法

如下代码演示了如何使用 Tokio 库实现一个异步的消息传递系统,其中包括三个无限通道和一个关闭通道。程序使用了 select! 宏来等待通道和关闭通道的事件,并在事件发生时执行相应的操作。

程序的主要步骤如下:

  1. 创建三个无限通道和一个用于传递关闭信号的通道。
  2. 向三个通道中发送一些数据。
  3. 开启一个异步任务并在两秒后发送关闭信号。
  4. 在主循环中使用 select! 宏等待通道和关闭通道的事件。
  5. 当一个通道接收到数据时,打印出数据。
  6. 当关闭通道接收到信号时,退出循环。

程序中的 select! 宏使用了类似于 match 的语法,但是它可以同时等待多个异步事件。当其中一个事件发生时,宏将执行相应的代码块,并跳出循环。在本例中,当一个通道接收到数据时,打印出数据;当关闭通道接收到信号时,退出循环。
select!经常与loop搭配使用,循环地从多个通道中接收事件并处理。

use std::time::Duration;

use tokio::select;

#[tokio::main]
async fn main() {
    let (sender1, mut receiver1) = tokio::sync::mpsc::unbounded_channel::<String>();
    let (sender2, mut receiver2) = tokio::sync::mpsc::unbounded_channel::<String>();
    let (sender3, mut receiver3) = tokio::sync::mpsc::unbounded_channel::<String>();

    let (shutdown_sender, mut shutdown_receiver) = tokio::sync::watch::channel(());
    for i in 0..3 {
        sender1.send(i.to_string()).unwrap();
        sender2.send(i.to_string()).unwrap();
        sender3.send(i.to_string()).unwrap();
    }

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        shutdown_sender.send(()).unwrap(); //两秒后关闭
    });

    loop {
        select! {
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }
}

可能遇到的坑

阻塞

select中的各个分支是并行执行的,这里的并行是指分支中的各个future在并行执行。不过一旦某个分支的future完成并进入了分支代码块,如果在分支代码中有一些阻塞的操作,则其他分支是没有机会执行的。
比如下面代码,在receiver1.recv()完成时,sleep了10s,sleep期间其他的分支是不会执行的。即使在2s后发送了shutdown信号,select!因为无法及时处理此信号,实际上循环也无法退出。

 loop {
        select! {
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
                tokio::time::sleep(Duration::from_secs(10)).await;//这里等待期间,其他的分支是无法被执行的
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

这个坑在网络编程中比较容易踩到,比如select这里是从channel中取出上层应用传来的数据,并将其写入到socket中,而写socket的操作是有可能阻塞的,阻塞期间其他的分支是无法执行的。

顺序

1、默认情况下select中的各个分支执行顺序是随机的,比如上面例子中三个channel都有消息的情况下,具体去执行哪个分支是随机的。执行结果如下:
在这里插入图片描述
2、如果想要区分优先级,可以加标志biased,这样每次select将会按照从上到下的顺序去poll每个future,也就是说优先级顺序是从上往下的。比如某些场景下需要按优先级处理各个channel中的数据时这个特性就很有用。代码如下:

    loop {
        select! {
            biased;//按顺序优先执行
            ret = receiver1.recv() => {
                println!("channel 1 received: {:?}", ret);
            },
            ret = receiver2.recv() => {
                println!("channel 2 received: {:?}", ret);
            },
            ret = receiver3.recv() => {
                println!("channel 3 received: {:?}", ret);
            },
            _ = shutdown_receiver.changed() => {
                println!("shutdown received");
                break;
            }
        };
    }

运行结果如下:
在这里插入图片描述
3、顺序执行时注意饿死问题
添加了biased标志后,顺序靠前的future总是先被执行,在上述例子中,极端情况下如果靠前的channel总是有数据,那后面的channel就没有机会被执行。比如例子中如果前三个channel中一直有数据,那shutdown_receiver就无法收到shutdown信号,导致程序功能不符合预期。
解决这个问题很简单,就是把更关键的控制性的future放在最前方。

关于cancel safe

select!中如果某个分支future completed了,会将其他分支的future cancel掉,这个cancel操作要格外小心,因为如果future不是cancel safe的可能会丢数据。tokio的官方文档中给出了常见的cancel safe和不safefuture
那么如何判断自己实现的future是否是cancel safe的呢? 很简单、只需要思考如果future中的代码执行到.await时被cancel了,是否是安全的。我们来看下cancel unsafe的代码长啥样:

pub async fn read_and_write(mut message_recevier: UnboundedReceiver<Bytes>, mut file: File) {
    let message = message_recevier.recv().await.unwrap();
    file.write(&message).await.unwrap();
}

该方法从一个channel中读取消息,并将此消息写入到文件中,这个future就明显不是cancel safe的。为啥呢?试想一下,此futurechannel中读到消息之后,在写文件时被cancel掉了,那message岂不是就丢了。
实际项目中一定要格外小心这个cancel safe问题,很容易造成丢数据或者数据重复等不良反应,而且一旦出现了还很难复现、不太容易想到是这里的问题。网络编程中尤其要注意tokio::io::AsyncWriteExt::write_all不是cancel safe的,因为它内部可能是多次调用write操作才将所有缓冲区写入。

数量

1、首先select!中的分支仅支持显式地用代码书写,无法动态增减。就是说在写代码时select中的futures数量就固定了,程序运行过程中无法动态删减。
2、目前最多支持64个分支。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/191668.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL简单介绍

简单了解MySQL MySQL语句分类 SQL语句分类 DDL&#xff1a;数据定义语句 create表&#xff0c;库.….] DML&#xff1a;数据操作语句 [增加insert&#xff0c;修改 update&#xff0c;删除delete] DQL&#xff1a;数据查询语句 [select] DCL&#xff1a;数据控制语句 …

【RTP】3: RTPSenderVideo::SendVideo 切片到发送

m98 版本。之前1 2 都是m79.RTPSenderVideo::SendVideo 负责切片,是入口 实际发送要靠: RTPSender* const rtp_sender_; 外部传递的: rtp_rtcp\source\rtp_sender.h 实现了rtp rtcp协议 ,负责实际的打包 新增了一个 TransformableFrameInterface 用的 编码线程 - RTPSend…

【算法萌新闯力扣】:卡牌分组

力扣热题&#xff1a;卡牌分组 一、开篇 今天是备战蓝桥杯的第22天。这道题触及到我好几个知识盲区&#xff0c;以前欠下的债这道题一并补齐&#xff0c;哈希表的遍历、最大公约数与最小公倍数&#xff0c;如果你还没掌握&#xff0c;这道题练起来&#xff01; 二、题目链接:…

关于el-table的二次封装及使用,支持自定义列内容

关于el-table的二次封装及使用 table组件 <template><el-table ref"tableComRef" :data"tableData" border style"width: 100%"><el-table-column v-if"tableHeaderList[0]?.type selection" type"selection&…

下载网页内容成HTML文件

今天遇到了一个非常好用的、开源的网页下载插件: SingleFile&#xff0c;它可以将当前网页里的文字、图片、超链接等&#xff0c;合并成单一的.html文件&#xff0c;便于保存和浏览查看。下面介绍SingleFile的安装和使用。 1、下载SingleFile插件 SingleFile官网地址&#xff…

如何使用JMeter测试导入接口/导出接口

今天一上班&#xff0c;被开发问了一个问题&#xff1a;JMeter调试接口&#xff0c;文件导入接口怎么老是不通&#xff1f;还有导出文件接口&#xff0c;不知道文件导到哪里去了&#xff1f; 我一听&#xff0c;这不是JMeter做接口测试经常遇到的嘛&#xff0c;但是一时半会又…

STM32-SPI3控制MCP3201、MCP3202(Sigma-Delta-ADC芯片)

STM32-SPI3控制MCP3201、MCP3202&#xff08;Sigma-Delta-ADC芯片&#xff09; 原理图手册说明功能方框图引脚功能数字输出编码与实值的转换分辨率设置与LSB最小和最大输出代码&#xff08;注&#xff09; 正负符号寄存器位MSB数字输出编码数据转换的LSB值 将设备输出编码转换为…

Ps:使用钢笔工具绘制自由路径的技巧

只有熟练掌握使用钢笔工具绘制自由路径的技巧&#xff0c;才能快速完成复杂形状的创建以及精准抠图等工作。 钢笔工具是 Photoshop 中绘制路径的主要工具。无论是直线路径还是曲线路径&#xff0c;钢笔工具都能够提供高度的控制和精确度。 ◆ ◆ ◆ 绘制直线路径 绘制直线路径…

解决OSError: [Errno 28] No space left on device报错和搭建AIrtest无线配置手机集群

OSError: [Errno 28] No space left on device和搭建AIrtest无线配置手机集群 做手机无限集群控制时&#xff0c;常常遇到这种错误问题。表示您的设备上没有足够的可用磁盘空间来完成某个操作。我们遇到了还得重新开端口和输入ip&#xff0c;如果有几百台手机是不是中午就不吃…

我的创作纪念日-五周年

机缘 5年前&#xff0c;作为一名技术人员&#xff0c;平时利用CSDN作为学习平台工具&#xff0c;帮助解决工作中遇到的问题。随着30、35中年危机渐行渐近&#xff0c;回过头来发现平时虽然也有记录整理学习笔记的习惯&#xff0c;但还没有一个可以持续鞭笞自己和记录自己学习的…

网页设计作业-音乐网站首页

效果图 网盘链接 链接&#xff1a;https://pan.baidu.com/s/1CO4jAOY0zk1AWTx_pC3UmA?pwdfuck 提取码&#xff1a;fuck

原神「神铸赋形」活动祈愿现已开启

亲爱的旅行者&#xff0c;「神铸赋形」活动祈愿现已开启&#xff0c;「单手剑静水流涌之辉」「法器碧落之珑」概率UP&#xff01; 活动期间&#xff0c;旅行者可以在「神铸赋形」活动祈愿中获得更多武器与角色&#xff0c;提升队伍的战斗力&#xff01; 〓祈愿时间〓 4.2版本更…

C++通讯录管理系统

目录 系统需求 1、 创建项目 2、 菜单功能设计 3、 退出功能设计 4、 添加联系人功能设计 4.1 设计联系人结构体 4.2 设计通讯录结构体 4.3 在main函数中创建通讯录 4.4 封装添加联系人函数 4.5 添加联系人功能测试 5、 显示联系人功能设计 5.1 封装显示…

【开源】基于JAVA的高校学院网站

项目编号&#xff1a; S 020 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S020&#xff0c;文末获取源码。} 项目编号&#xff1a;S020&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学院院系模块2.2 竞赛报名模块2.3 教…

FilterChain攻击解析及利用

文章目录 BASE64解码和编码原理浅析EncodingDecoding Filterchain构造&#xff08;原理阐述&#xff09;回顾死亡代码特性一&#xff08;双重去杂&#xff09;特性二&#xff08;粘合性&#xff09; 任意字符构造工具一工具二 实战例题[NSSRound#7 Team]brokenFilterChain&…

jdk17安装全方位手把手安装教程 / 已有jdk8了,安装JDK17后如何配置环境变量 / 多个不同版本的JDK,如何配置环境变量?

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对博主首页也很感兴趣o (ˉ▽ˉ&#xff1b;) 学生邮箱白嫖/免费安装JetBrains全家桶(IDEA/pycharm等) —— 保姆级教程 目录 1、下载jdk17 2、安装jdk17 3、配置环境变量 -> 电脑无其他jdk 4、…

Elasticsearch集群部署,配置head监控插件

Elasticsearch是一个开源搜索引擎&#xff0c;基于Lucene搜索库构建&#xff0c;被广泛应用于全文搜索、地理位置搜索、日志处理、商业分析等领域。它采用分布式架构&#xff0c;可以处理大规模数据集和支持高并发访问。Elasticsearch提供了一个简单而强大的API&#xff0c;可以…

python基础练习题库实验4

文章目录 题目1代码实验结果 题目2代码实验结果 题目3代码实验结果 题目4代码实验结果 题目5代码实验结果 题目6代码实验结果 题目总结 题目1 编写一个程序&#xff0c;使用for循环语句和字符串格式显示以下精确输出。 例如&#xff1a; 代码 for i in range(1, 11):result…

基于xml配置的AOP

目录 xml方式AOP快速入门 xml方式AOP配置详解 xml方式AOP快速入门 xml方式配置AOP的步骤 导入AOP相关坐标 <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId><version>1.8.13</version></de…

代码随想录算法训练营 ---第四十五天

前言&#xff1a; 昨天的题做过之后&#xff0c;今天的题基本上都很简单&#xff0c;但是要注重一下细节。 第一题&#xff1a; 简介&#xff1a; 动态规划五部曲&#xff1a; 1.确定dp数组的含义 dp[i]&#xff1a;爬到有i个台阶的楼顶&#xff0c;有dp[i]种方法 2.确定dp…
最新文章