Published on

Flink 实时架构:玩转 Kafka 与 Upsert-Kafka 的“混搭”艺术

Authors
  • avatar
    Name
    Charles Chen
    Twitter

在 Flink SQL 的世界里,很多人分不清什么时候该用 upsert-kafka,什么时候该选 canal-json。其实,这本质上是**“流(Stream)”与“表(Table)”**两种世界观的碰撞。

一、 连接器(Connector):传送带 vs. 智能白板

1. Kafka Connector:永远向前的“流水账”

这是最标准的 Append-only(仅追加) 模式。

  • 特性: 它不关心主键,不关心你之前发了什么。你发一条,它接一条。
  • 适用场景: 用户点击日志、传感器采样、以及 CDC 原始变更流
  • 架构地位: 它是 ODS 层(贴源层)的绝对主力,负责“还原现场”。

2. Upsert-Kafka Connector:会自我修正的“快照”

这是基于 Primary Key(主键) 的更新插入模式。

  • 特性: 它要求必须有主键。当你发送一个主键相同的数据时,它代表“更新”;当你发一个 Value 为空的 Key,它代表“删除”。
  • 适用场景: 实时聚合结果、维表镜像。
  • 架构地位: 它是 DWS/ADS 层(服务层)的神器,负责“输出结论”。

二、 格式(Format):数据在路上穿什么衣服?

格式核心能力潜台词
json简单平铺,无状态语义。“这就是一条普通数据,爱咋咋地。”
canal-json包含 data, old, type 等元数据。“我能告诉你这行数据在数据库里是怎么被修改的。”

三、 高阶架构:黄金搭配方案

作为架构师,你需要根据业务目标,灵活组合这两者:

方案 A:Kafka + canal-json(数据库的完美镜像)

  • 组合拳: 普通 Kafka 管道 + 携带变更信息的 Canal 格式。
  • 为什么这么搭: * Canal 格式能把 MySQL 的 UPDATE 拆解成 UPDATE_BEFOREUPDATE_AFTER
    • Flink 收到后,能自动识别哪些是旧值、哪些是新值。
  • 注意事项: 解析成本最高。Canal 格式比普通 JSON 胖出一大圈,如果每秒几十万 QPS,解析它会消耗掉你 M4 Pro 大量的 CPU。

方案 B:Upsert-Kafka + json(高性能状态下沉)

  • 组合拳: 自带更新能力的连接器 + 轻量级普通 JSON。
  • 为什么这么搭: * 当你算出了“每个用户的当前总分”,你只需要把最新的分数值写出去。
    • upsert-kafka 会利用 Kafka 的 Log Compaction 机制,确保 Topic 里只留每个用户最新的那条记录。
  • 注意事项: 必须定义 Primary Key。没有主键,Upsert 语义就成了无米之炊。
场景推荐 Connector核心理由
ODS 层(原始贴源)Standard Kafka追求极致吞吐。日志、CDC 原始流不应在此层合并,保留“历史现场”。
DWD 层(明细宽表)Standard/Upsert如果是过滤/清洗,用 Standard;如果是多流 Join 后的结果,推荐 Upsert。
DWS 层(聚合层)Upsert-KafkaGROUP BY 的统计结果随时间变化,必须用 Upsert 语义更新下游。
维表化(Look-up)Upsert-Kafka将 Kafka 模拟成一张“不断更新的表”,供其他流 Join。

四、 $50w 年薪级别的“避坑”深度解析

1. 序列化成本

canal-json 包含了 olddatatype 等大量元数据。在 高吞吐(每秒 10w+) 场景下,解析 canal-json 的 CPU 开销比普通 json 高出约 2020% \sim 30%

  • 高薪操作: 如果你只需要 INSERT 数据,请强制使用普通的 json 格式以节省资源。

在生产环境,如果吞吐量极大,不要盲目迷信 JSON。

深度技巧: 考虑使用 key.format = 'raw'。如果你的主键是一个简单的 INTSTRING,用 raw 格式直接读写二进制,比反复解析 JSON 主键要快得多。

2. 墓碑消息(Tombstone)与数据清理

当你使用 upsert-kafka 搭配 json 时,务必检查 Kafka Topic 的配置: log.cleanup.policy = compact 如果没开这个,Kafka 不会清理旧的 Key,你的 upsert-kafka 就会变成一个“无限膨胀”的普通 Kafka。

3. Metadata 的妙用

在使用 canal-json 时,经常需要提取数据库名、表名。你可以通过 Flink 的 Metadata Column 灵活提取,而不需要在 json 里手动解析:

CREATE TABLE canal_source (
  db_name STRING METADATA FROM 'value.database' VIRTUAL,
  table_name STRING METADATA FROM 'value.table' VIRTUAL,
  ...
) WITH ('format' = 'canal-json');

4. Checkpoint 的救命作用

无论是哪种 Connector,只要涉及 CDC(增量采集),必须开启 Checkpoint

  • 如果不开启,任务挂了之后,Flink 会忘记读到 binlog 的哪个位置了。这会导致你的 Upsert 逻辑出现重复或者空洞,数据一致性瞬间崩塌。

五、 结语

  • 入湖入仓Kafka + canal-json,追求完整性。
  • 结果对外Upsert-kafka + json,追求实时性与简洁性。