Published on

Flink 诡异报错排查:为什么我的 Kafka Sink 会“间歇性失明”?

Authors
  • avatar
    Name
    Charles Chen
    Twitter

在分布式系统的世界里,最让人抓狂的不是全盘崩溃,而是 “厚此薄彼”

场景复现: 你写了一个 Flink SQL 任务,双 Source 接入,一路 Sink 到 OceanBase (OB),一路 Sink 到 Kafka。结果 OB 端数据刷刷地流,Kafka 端却是“稳如泰山”的一条 0。

这大概率不是因为你运气不好,而是你踩中了 Flink 与 Kafka 交互时的三大深坑。今天,我们以这场“架构手术”为例,拆解高阶大数据工程师的排查逻辑。


一、 第一场手术:切除“巨婴记录” (RecordTooLarge)

症状描述: 报错堆栈中出现 org.apache.kafka.common.errors.RecordTooLargeException

病理分析: 这是最经典的“虚胖”导致的崩盘。你在 SQL 中使用了 UNNEST 炸裂大字段(比如 Canal 采集的 JSON),如果:

  1. 字段残留:炸裂后依然 SELECT * 带着原始的巨型 data 字段。
  2. 批次过载:Flink Producer 默认攒批发送,当追历史数据时,瞬间堆积的批次超过了 Kafka Broker 默认的 1MB 限制。

架构师的处方:

  • 字段修剪:只 SELECT 炸裂后的明细,彻底扔掉原始的大 JSON。
  • 物理扩容:调大 Sink 端的请求限制,并开启 LZ4 压缩(这能让你的数据体积瞬间缩减 70% 以上)。

二、 第二场手术:唤醒“失忆”的 Source (Offset Reset)

症状描述: 任务重启后大面积报错 NoOffsetForPartitionException: Undefined offset with no reset policy

病理分析: Flink 任务因为 Sink 报错崩溃了,但由于你没有开启有效的 Checkpoint (ckpt)、ckpt失败,或者更换了 group.id,Flink 重启后像失忆了一样,不知道该从 Kafka 的哪个位置开始读。如果你的 DDL 里没写 earliest 策略,它就会原地自杀。

架构师的处方: 在 Source 端 DDL 中强行指定“逃生通道”,并开启 Checkpoint 保证状态持久化。


三、 终极玄学:为什么“删了重建”治百病?

深度复盘: 最后你发现,什么都没改,删除 Topic 重建 居然就好了。这背后不是玄学,是三条硬逻辑:

  1. 清除“毒药批次” (Poison Pill):之前有一个超大请求卡在 Flink 内存 Buffer 里,发不出去也丢不掉。删除 Topic 强制断开连接,清空了这块“坏死内存”。
  2. 重置 Topic 级配置:旧 Topic 的 max.message.bytes 可能还是旧的 1MB。重建后,它继承了 Kafka 集群新的、更大的全局配置。
  3. 元数据刷新 (Metadata Refresh):旧 Topic 的分区 Leader 可能处于半死不活的状态(导致 TimeoutException)。重建强制触发了集群重新分配 Leader,链路瞬间打通。

你的 DDL 不仅要能跑,还要带“防弹衣”。

1. 强力 Source DDL(防脏数据、防失忆)

SQL

CREATE TABLE dw_address_book_source (
  id BIGINT,
  data ARRAY<ROW<val STRING>>, -- 假设 data 是数组
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = 'dt_user.address_book',
  'properties.bootstrap.servers' = '10.8.x.x:9092',
  'properties.group.id' = 'dw_consumer_v1',
  'format' = 'canal-json',

  -- 【保命参数】忽略解析错误的脏数据,不让任务崩掉
  'canal-json.ignore-parse-errors' = 'true',

  -- 【保命参数】找不到位点时从头读,防止丢数据
  'scan.startup.mode' = 'earliest-offset',
  'properties.auto.offset.reset' = 'earliest'
);

2. 暴力 Sink DDL(防过载、防超时)

SQL

CREATE TABLE dw_address_book_sink2kafka (
  id BIGINT,
  val STRING -- 只要炸裂后的字段
) WITH (
  'connector' = 'kafka',
  'topic' = 'cw_ods_dw_address_book_prod',
  'properties.bootstrap.servers' = '10.8.x.x:9092',
  'format' = 'json',

  -- 【性能优化】开启 LZ4 压缩,大数据量必备
  'properties.compression.type' = 'lz4',

  -- 【核心优化】调大单次请求上限到 10MB,根治 RecordTooLarge
  'properties.max.request.size' = '10485760',

  -- 【稳定性】设置 at-least-once,牺牲极小概率的重复,换取高性能和不丢数
  'sink.delivery-guarantee' = 'at-least-once',

  -- 【容错】增加超时时间,防止网络抖动导致任务崩溃
  'properties.request.timeout.ms' = '60000'
);

五、 排查思路总结:架构师的“三板斧”

  1. 看 Metrics (度量):Source 是否有 numRecordsIn?如果有但 Sink 没 numRecordsOut,说明数据卡在算子中间或序列化失败了。
  2. 看 Checkpoint (检查点):如果 CK 频率性失败,说明任务在不断重启,优先查 TaskManager 日志里的第一条 ERROR
  3. 看 Environment (环境):不要迷信代码。telnet 端口通不通?Kafka Topic 限制是多少?有时候解决问题的钥匙在代码之外。

专家寄语: 遇到报错不要慌,所有的“玄学”背后都是物理限制。