Rust 流式输出:让模型边生成边显示,但别忘了中断

📅 2026/7/3 21:21:34 👁️ 阅读次数 📝 编程学习
Rust 流式输出:让模型边生成边显示,但别忘了中断

Rust 流式输出:让模型边生成边显示,但别忘了中断

第一次用 AI CLI 工具时,我最喜欢的体验就是"字一个一个往外蹦"的感觉——不用等模型完全生成完,就能看到内容在慢慢出现。但自己动手实现流式输出后才知道,这种"丝滑体验"背后有一堆需要处理的边界:网络 chunk 可能不按字符对齐、半个 UTF-8 字节、用户突然 Ctrl+C、输出重定向时日志混进结果里。

我最初实现流式输出时只做了两件事:打开 SSE 连接,把每个 chunk 的内容print!出来。能用没多久就碰到了问题——用户中断后留下残缺文件、终端输出卡住不刷新、UTF-8 中文被截成乱码。流式输出不是简单的"边收边打",它需要把数据流拆成多个可控的层,每一层都能处理自己那一层的异常。

在自学的过程中,我之前对流式编程的理解几乎为零。今天这篇是我用 Rust + Tokio 实现 AI 流式输出时的踩坑笔记。

一、把流式链路拆成独立层 — 每层只做一件事

流式输出的完整链路可以拆成五个环节,每个环节只关注自己的边界:

flowchart TD A[HTTP SSE 流 SSE Stream] --> B[字节块缓冲 Byte Buffer] B --> C[事件行解析 SSE Parser] C --> D[文本增量累积 Text Accumulator] D --> E[终端增量渲染 Terminal Render] A -->|网络异常| F[中断处理 Interrupt Handler] E -->|Ctrl+C 信号| F F --> G{用户意图? User Intent} G -->|丢弃 Discard| H[清理临时数据 Cleanup] G -->|保存 Save| I[写入部分结果 Partial Save] D --> J[完整结果累积 Full Buffer] J --> K[结束后保存 Save Complete] style F fill:#ff9,stroke:#333 style H fill:#f66,stroke:#333 style I fill:#ff9,stroke:#333 style K fill:#6f6,stroke:#333

关键思路是:显示用的文本流和保存用的完整结果要分两条路径。终端展示是增量的、可中断的;文件保存是完整的、在流结束之后才执行的。不要把这两个目标混在同一个 buffer 里。

二、终端输出务必及时刷新

没有flush(),用户可能看到输出突然憋住不动,直到生成结束才一口气出来。这个体验跟流式的初衷完全相反:

use std::io::{self, Write}; /// 增量输出一个文本片段到终端,并立即刷新 fn print_chunk(text: &str) -> io::Result<()> { // 直接写入 stdout print!("{}", text); // 立即刷新,让用户看到实时输出 io::stdout().flush() } /// 错误和日志信息永远输出到 stderr,不要污染 stdout fn log_debug(msg: &str) { // 用户可能把 stdout 重定向到文件,stderr 单独输出 eprintln!("[debug] {}", msg); }

这里面有一个小习惯对我帮助很大:流式内容写 stdout,调试信息写 stderr。如果用户想把输出重定向到文件(比如ai-cli ask "hello" > response.txt),日志不会混进模型回复里。CLI 工具经常会被人接到管道里用,输出流保持干净是基本素养。

三、正确处理中断信号 — Ctrl+C 不是程序崩了,而是用户选择了停止

用户按 Ctrl+C 是正常操作,不是异常退出。程序应该在收到信号后停止网络请求、清理临时状态、给用户一个明确的选择:

use tokio::signal; use tokio::select; /// 同时等待流式响应和用户中断信号 async fn stream_with_cancel_support( response_future: impl std::future::Future<Output = Result<String, String>>, ) -> Result<String, String> { let mut accumulated = String::new(); select! { // 分支 1:流正常完成 result = response_future => { match result { Ok(text) => { println!(); // 换行,与流式输出断开 Ok(text) } Err(e) => Err(format!("流式请求失败: {}", e)), } } // 分支 2:用户按下 Ctrl+C _ = signal::ctrl_c() => { eprintln!("\n\n操作已被用户中断"); eprintln!("提示:已生成的内容暂未保存,如需保留请使用 --save 参数"); Err("用户取消".to_string()) } } }

被中断后,程序应该告知用户明确的状态:是"已取消、无残留"还是"已取消、部分结果保存在某处"。不要让用户靠猜来判断中断后的文件能不能继续使用。

四、处理 UTF-8 边界和 chunk 不完整的问题

网络 chunk 不会礼貌地按字符边界分割。如果你收到的字节块刚好把一个中文字符的三字节 UTF-8 编码切成两半,直接当字符串解析就会出乱码:

/// 字节缓冲区:处理不完整的 UTF-8 字节 struct ByteBuffer { /// 暂存的不完整字节 buffer: Vec<u8>, } impl ByteBuffer { fn new() -> Self { ByteBuffer { buffer: Vec::new() } } /// 接收新的字节块,返回可安全解析为字符串的完整部分 fn feed(&mut self, mut chunk: Vec<u8>) -> String { // 先把上次剩余的不完整字节拼在前面 let mut full = Vec::new(); full.append(&mut self.buffer); full.append(&mut chunk); // 从后往前找完整的 UTF-8 字符边界 let valid_len = Self::valid_utf8_prefix_len(&full); let valid = full[..valid_len].to_vec(); // 剩余不完整字节暂存起来,等下次 chunk 到达时拼接 self.buffer = full[valid_len..].to_vec(); // 安全转换 String::from_utf8(valid).unwrap_or_else(|e| { eprintln!("[警告] UTF-8 解析异常: {}", e); String::from_utf8_lossy(&e.into_bytes()).to_string() }) } /// 找到能安全解析为 UTF-8 的最大前缀长度 fn valid_utf8_prefix_len(data: &[u8]) -> usize { // 从末尾向前尝试,找到第一个有效的 UTF-8 截断点 for len in (0..=data.len()).rev() { if std::str::from_utf8(&data[..len]).is_ok() { return len; } } 0 } }

实际项目中,如果使用成熟的 SSE/NDJSON 解析库(比如eventsource-streamtokio-sse-codec),它们一般已经处理好了字节拼接和字符边界问题。但理解底层原理对排查偶尔出现的乱码问题很有帮助——不能永远靠库来兜底,出了问题至少要能看懂是哪个环节出了故障。

五、总结

Rust 实现 AI 流式输出需要在五个层面做好边界处理:字节缓冲防截断、事件解析分 chunk、文本增量发终端、中断信号能优雅退出、完整结果独立保存。边生成边显示是加分体验,但可靠工具还要知道什么时候该停,停下后留下什么状态。

作为自学者,写流式输出是我学到最多系统编程细节的一块。它同时涉及网络 I/O、编码、终端控制、并发信号——每一项单独看都很小,但合在一起就让工具从"能用"变成了"在各种场景下都能从容应对"。流式输出不是加分项,是让 AI CLI 真正可用的基础能力。