Published on

StreamPark + Flink SQL 实战:从零创建任务到生产级调优

Authors
  • avatar
    Name
    Charles Chen
    Twitter

一、 环境背景

  • 容器工具:Docker
  • 核心组件:StreamPark 2.1.6, Flink 1.18, Kafka 3.7
  • 痛点:解决 Docker 容器网络隔离、Checkpoint 挂载权限、依赖管理等问题。

在 StreamPark 中创建一个可运行的任务,必须遵循标准的 “依赖 -> 配置 -> 发布 -> 启动” 流程:

1. 准备工作:引入依赖 (Dependency)

Flink 默认不包含 Kafka 连接器。在创建任务前(或编辑任务时),必须手动添加 Maven 依赖,否则会报 Could not find any factory for identifier 'kafka'

  • 操作入口:Application -> Edit -> Dependency -> Add New (Maven)
  • 核心坐标 (针对 Flink 1.18):
    • Kafka 连接器:org.apache.flink:flink-sql-connector-kafka:3.0.1-1.18
    • (可选) RocksDB 后端:org.apache.flink:flink-statebackend-rocksdb:1.18.1

在 Application 中选择 Flink SQL 模式。注意以下针对 Docker 环境的特殊配置:

SQL

-- 1. 生产级 Checkpoint 配置
SET 'execution.checkpointing.interval' = '180s'; -- 生产环境建议 3分钟
SET 'state.checkpoints.dir' = 'file:///flink_checkpoints'; -- 【关键】必须填容器内路径
SET 'state.checkpoints.num-retained' = '10'; -- 保留最近10个存档
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION'; -- 取消任务时不删存档

-- 2. Source 表定义
CREATE TABLE source_kafka (
    id INT,
    event_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'my_topic',
    'properties.group.id' = 'my_group',
    -- 【关键】Docker 网络隔离,不能写 localhost,要写容器名
    'properties.bootstrap.servers' = 'kafka:9092',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);

-- 3. Sink 表定义 (精准一次)
CREATE TABLE sink_kafka (
    id INT,
    event_time TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic1',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    -- 【关键】开启 Exactly-Once
    'sink.delivery-guarantee' = 'exactly-once',
    'sink.transactional-id-prefix' = 'tx_my_job_'
);

INSERT INTO sink_kafka SELECT * FROM source_kafka;

3. 发布与启动 (Release & Start)

  • Release:点击“小火箭”图标。此步骤 StreamPark 会下载依赖 jar 包并构建运行环境。
  • Start:点击“播放”按钮。如果是重启,且需要恢复状态,记得勾选 "From Savepoint/Checkpoint"

三、 避坑指南(排错实录)

这是本文最精华的部分,记录了在 macOS + Docker 环境下必定会遇到的三个问题:

坑点 1:Connection Refused (网络连接)

  • 报错TimeoutException: Topic not present in metadata
  • 原因:在 Docker 容器内,localhost 指向容器自己,而不是宿主机或其他容器。
  • 解决
    • 查看 Kafka 容器名称:docker ps
    • 修改 SQL 地址:将 localhost:9092 改为 kafka:9092 (容器名:端口)。

坑点 2:Permission Denied (Checkpoint 权限)

  • 报错AccessDeniedException: /flink_checkpoints/...
  • 原因:macOS 的 Docker 挂载机制(Bind Mounts)。宿主机目录权限归属 Mac 用户(UID 501),而 Flink 容器默认以 flink 用户(UID 9999)运行,导致无法写入挂载的本地目录。
  • 错误尝试:在容器内 chmod 777 (重启任务生成新文件夹后失效)。
  • 终极解决: 修改 docker-compose.yml,强制 Flink 容器以 Mac 当前用户身份运行。 YAML
    services:
      jobmanager:
        # 格式为 "UID:GID",macOS通常是 501:20
        user: "501:20"
        volumes:
          - ./data/flink_checkpoints:/flink_checkpoints
      taskmanager:
        user: "501:20"
        volumes:
          - ./data/flink_checkpoints:/flink_checkpoints
    

坑点 3:数据不可见 (事务隔离)

  • 现象:开启了 exactly-once 后,下游迟迟看不到数据。
  • 原因:Kafka 事务机制,数据处于 "Uncommitted" 状态,直到 Checkpoint 完成才提交。
  • 解决
    • 这是正常现象。
    • 下游消费者必须配置 isolation.level = read_committed 才能读取这些数据。

四、 生产环境建议

从 Demo 到生产上线,建议修改以下配置:

  1. 状态后端:从 filesystem 改为 rocksdb (需添加依赖),防止内存溢出。
  2. 重启策略:设置 restart-strategy = failure-rate,容忍偶发的网络抖动,不要一报错就挂。
  3. 微批处理:对于聚合任务,开启 table.exec.mini-batch.enabled = true,大幅提升吞吐量。
  4. 持久化:务必在 docker-compose.yml 中挂载 volumes,确保 MySQL (StreamPark 元数据) 和 Zookeeper/Kafka 数据在重启后不丢失。

结语: 在 Mac Docker 环境下跑大数据栈,权限和网络是最大的拦路虎。通过 user: "501:20" 解决权限问题,配合 StreamPark 优秀的管理能力,可以构建一个非常丝滑的本地开发流水线。