Flink DataStream API vs Flink SQL:核心异同对比
一、编译链路对比
二、多维度对比
维度 | DataStream API | Flink SQL |
抽象层次 | 命令式(HOW:告诉引擎怎么做) | 声明式(WHAT:告诉引擎做什么) |
优化空间 | 用户手动优化逻辑 | 优化器自动选择最优策略 |
前端编译 | 无(直接构建 Transformation) | 完整的 SQL 编译流水线(Parse → Validate → Optimize → Physical Plan) |
算子可控性 | 完全可控(自定义 ProcessFunction 等) | 受限于 SQL 语义(通过 UDF 扩展) |
状态管理 | 手动管理状态(State API) | 框架自动管理状态(Aggregate State 等) |
侧输出(Side Output) | OutputTag | 不支持(需用 UNION ALL + Filter 模拟) |
自定义窗口 Trigger | 任意 Trigger 逻辑 | 仅支持标准窗口语义 |
复杂事件处理(CEP) | CEP Library | MATCH_RECOGNIZE(SQL 标准 CEP) |
类型系统 | Java/Scala 类型系统 | SQL 类型系统(与 Java 类型有映射关系) |
Schema 演化 | 手动处理 | 依赖 Catalog,有一定自动支持 |
适用场景 | 复杂事件处理、精细化控制、自定义状态逻辑 | ETL、聚合分析、多表 Join、标准化 pipeline |
三、运行时行为差异
特性 | DataStream | Flink SQL |
代码生成 | 无 | 部分算子使用 CodeGen(如 Calc 节点中的表达式计算编译为字节码) |
状态结构 | 用户自定义 | 框架规定的内部状态格式(如 MapState 存储聚合中间结果) |
序列化 | TypeSerializer 由用户类型决定 | 内部使用 BinaryRowData 等紧凑行式格式 |
Watermark | 用户指定 WatermarkStrategy | 通过 DDL 中 WATERMARK FOR 子句声明 |
uid 管理 | 用户手动设置,完全可控 | 框架自动生成,基于查询结构的确定性 hash |
拓扑变更恢复 | 只要 uid 一致即可恢复 | SQL 语句任何修改(包括列顺序)可能导致 uid 变化,无法恢复 |
四、何时选择 DataStream API?
场景 | 原因 |
复杂事件处理(CEP 自定义模式) | 需要精细控制状态和触发逻辑 |
自定义窗口逻辑(如 Session Gap 动态计算) | SQL 窗口语义固定,难以扩展 |
异步 IO 调用外部服务 | SQL 无直接对应能力 |
精细化状态管理(如 BroadcastState 模式) | SQL 状态由框架管理,不可自定义结构 |
需要 Side Output 分流 | SQL 不支持多路输出到不同类型的 Sink |
与非结构化数据交互 | SQL 要求强 Schema |
低延迟要求(逐条处理,不能攒批) | SQL 的 Mini-batch 等优化可能引入延迟 |
五、何时选择 Flink SQL?
场景 | 原因 |
标准 ETL(过滤、映射、聚合) | SQL 表达简洁,优化器自动优化 |
多表 Join | 优化器自动选择 Join 策略和顺序 |
维表关联(Lookup Join) | SQL 内置支持,无需手写异步逻辑 |
快速原型验证 | 声明式表达,开发效率高 |
团队 SQL 技能强于 Java 技能 | 降低上手门槛 |
需要统一批流逻辑 | 同一 SQL 可在两种模式下运行 |
频繁变更业务逻辑 | SQL 变更无需重新编译部署 Jar |
六、混用架构模式:SQL 与 DataStream 混用
在实际生产中,我们常常会将Flink SQL与DataStream API 搭配使用:
- SQL 做主体 + UDF 补充
- DataStream 为骨架 + SQL 做聚合分析
// Table → DataStream Table resultTable = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100"); DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); // DataStream → Table DataStream<Order> orderStream = env.addSource(...); Table orderTable = tableEnv.fromDataStream(orderStream, Schema.newBuilder() .column("orderId", DataTypes.STRING()) .column("amount", DataTypes.DECIMAL(10, 2)) .columnByExpression("proc_time", "PROCTIME()") .watermark("event_time", "event_time - INTERVAL '5' SECOND") .build()); tableEnv.createTemporaryView("orders", orderTable);混用时的注意事项:
- toDataStream() 时 SQL 层的优化边界在此截断,后续 DataStream 操作不再享受 SQL 优化器的优化
- fromDataStream() 时需要明确定义 Schema 映射,特别是时间属性和水位线
- 混用时 uid 管理变得复杂,SQL 自动生成的 uid 在拓扑变更时可能不稳定