- Published on
Flink 实时架构:玩转 Kafka 与 Upsert-Kafka 的“混搭”艺术
- Authors

- Name
- Charles Chen
在 Flink SQL 的世界里,很多人分不清什么时候该用 upsert-kafka,什么时候该选 canal-json。其实,这本质上是**“流(Stream)”与“表(Table)”**两种世界观的碰撞。
一、 连接器(Connector):传送带 vs. 智能白板
1. Kafka Connector:永远向前的“流水账”
这是最标准的 Append-only(仅追加) 模式。
- 特性: 它不关心主键,不关心你之前发了什么。你发一条,它接一条。
- 适用场景: 用户点击日志、传感器采样、以及 CDC 原始变更流。
- 架构地位: 它是 ODS 层(贴源层)的绝对主力,负责“还原现场”。
2. Upsert-Kafka Connector:会自我修正的“快照”
这是基于 Primary Key(主键) 的更新插入模式。
- 特性: 它要求必须有主键。当你发送一个主键相同的数据时,它代表“更新”;当你发一个 Value 为空的 Key,它代表“删除”。
- 适用场景: 实时聚合结果、维表镜像。
- 架构地位: 它是 DWS/ADS 层(服务层)的神器,负责“输出结论”。
二、 格式(Format):数据在路上穿什么衣服?
| 格式 | 核心能力 | 潜台词 |
|---|---|---|
json | 简单平铺,无状态语义。 | “这就是一条普通数据,爱咋咋地。” |
canal-json | 包含 data, old, type 等元数据。 | “我能告诉你这行数据在数据库里是怎么被修改的。” |
三、 高阶架构:黄金搭配方案
作为架构师,你需要根据业务目标,灵活组合这两者:
方案 A:Kafka + canal-json(数据库的完美镜像)
- 组合拳: 普通 Kafka 管道 + 携带变更信息的 Canal 格式。
- 为什么这么搭: * Canal 格式能把 MySQL 的
UPDATE拆解成UPDATE_BEFORE和UPDATE_AFTER。- Flink 收到后,能自动识别哪些是旧值、哪些是新值。
- 注意事项: 解析成本最高。Canal 格式比普通 JSON 胖出一大圈,如果每秒几十万 QPS,解析它会消耗掉你 M4 Pro 大量的 CPU。
方案 B:Upsert-Kafka + json(高性能状态下沉)
- 组合拳: 自带更新能力的连接器 + 轻量级普通 JSON。
- 为什么这么搭: * 当你算出了“每个用户的当前总分”,你只需要把最新的分数值写出去。
upsert-kafka会利用 Kafka 的 Log Compaction 机制,确保 Topic 里只留每个用户最新的那条记录。
- 注意事项: 必须定义 Primary Key。没有主键,Upsert 语义就成了无米之炊。
| 场景 | 推荐 Connector | 核心理由 |
|---|---|---|
| ODS 层(原始贴源) | Standard Kafka | 追求极致吞吐。日志、CDC 原始流不应在此层合并,保留“历史现场”。 |
| DWD 层(明细宽表) | Standard/Upsert | 如果是过滤/清洗,用 Standard;如果是多流 Join 后的结果,推荐 Upsert。 |
| DWS 层(聚合层) | Upsert-Kafka | GROUP BY 的统计结果随时间变化,必须用 Upsert 语义更新下游。 |
| 维表化(Look-up) | Upsert-Kafka | 将 Kafka 模拟成一张“不断更新的表”,供其他流 Join。 |
四、 $50w 年薪级别的“避坑”深度解析
1. 序列化成本
canal-json 包含了 old、data、type 等大量元数据。在 高吞吐(每秒 10w+) 场景下,解析 canal-json 的 CPU 开销比普通 json 高出约 。
- 高薪操作: 如果你只需要
INSERT数据,请强制使用普通的json格式以节省资源。
在生产环境,如果吞吐量极大,不要盲目迷信 JSON。
深度技巧: 考虑使用
key.format = 'raw'。如果你的主键是一个简单的INT或STRING,用raw格式直接读写二进制,比反复解析 JSON 主键要快得多。
2. 墓碑消息(Tombstone)与数据清理
当你使用 upsert-kafka 搭配 json 时,务必检查 Kafka Topic 的配置: log.cleanup.policy = compact 如果没开这个,Kafka 不会清理旧的 Key,你的 upsert-kafka 就会变成一个“无限膨胀”的普通 Kafka。
3. Metadata 的妙用
在使用 canal-json 时,经常需要提取数据库名、表名。你可以通过 Flink 的 Metadata Column 灵活提取,而不需要在 json 里手动解析:
CREATE TABLE canal_source (
db_name STRING METADATA FROM 'value.database' VIRTUAL,
table_name STRING METADATA FROM 'value.table' VIRTUAL,
...
) WITH ('format' = 'canal-json');
4. Checkpoint 的救命作用
无论是哪种 Connector,只要涉及 CDC(增量采集),必须开启 Checkpoint。
- 如果不开启,任务挂了之后,Flink 会忘记读到 binlog 的哪个位置了。这会导致你的 Upsert 逻辑出现重复或者空洞,数据一致性瞬间崩塌。
五、 结语
- 入湖入仓选
Kafka + canal-json,追求完整性。 - 结果对外选
Upsert-kafka + json,追求实时性与简洁性。