1. 先把“部署积木”想清楚:Client、JobManager、TaskManager
Flink 的核心组件永远是这三个:
- Client:把你的作业(Jar/SQL)编译成 JobGraph 并提交
- JobManager:统筹调度、容错、Web UI、REST
- TaskManager:真正跑算子(source/transform/sink)
你后面选 Standalone、Docker、K8s,本质只是“这些进程在哪里跑、怎么拉起、怎么伸缩、怎么高可用”。
2. Session vs Application:这是所有部署方式的“分水岭”
你贴的部署文档里反复强调两件事:资源隔离与 main() 执行位置。
- Session Mode
集群长期存在,多个作业共享 TaskManager 资源,提交快、成本低,但隔离差;一个作业把 TM 打崩,可能牵连同机其他作业。 - Application Mode
每个应用独占一个集群,JobManager 里直接执行 main();隔离更好、治理更清晰,但每个应用都要有自己的“集群生命周期”。
生产上常见策略:
- 多租户/多团队、作业很多:Session(配好队列隔离/限额)
- 关键链路/重作业:Application(隔离+可控)
3. Java 兼容性:别等上线再被 JDK 模块化“打脸”
你贴的兼容性要点可以总结成三句话:
- Flink 2.0 起默认/推荐 Java 17;2.0 起对 Java 21 属于实验支持
- Java 16+ 的 Jigsaw 模块化会影响反射(Kryo 序列化常见),需要用
--add-opens/--add-exports,建议通过env.java.opts.all统一配置 - 不要“缩短”官方默认的 opens/export 列表,只能追加,否则 Flink 自己都可能不稳定
4. Standalone:最“裸”的方式,适合本机/小规模自管
4.1 Session(最常用本地起步)
./bin/start-cluster.sh# Web UI: http://localhost:8081./bin/flink run ./examples/streaming/TopSpeedWindowing.jar ./bin/stop-cluster.sh4.2 Application(把作业当“集群的一部分”来跑)
把 jar 放进lib/或用--jars拉取,再用standalone-job.sh启动 JobManager,TaskManager 另起:
cp./examples/streaming/TopSpeedWindowing.jar lib/ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing ./bin/taskmanager.sh start4.3 Standalone 的现实代价
- 进程挂了要你自己拉起、资源扩缩容你自己做
- HA 要你自己配 ZK、masters/workers、共享存储
5. Working Directory:把“可恢复的本地痕迹”放到正确的盘
Flink 进程可以配置 working directory,用来存这些东西:
- BlobServer/BlobCache 的 blob
- 开启
state.backend.local-recovery时的本地 state - RocksDB 工作目录
核心配置思路:
process.working-dir(或分别指定process.jobmanager.working-dir/process.taskmanager.working-dir)- 想跨进程重启也能本地恢复:必须
1)state.backend.local-recovery: true
2)TaskManager resource-id 固定(taskmanager.resource-id)
3)重启后仍能挂回同一个 working-dir 盘
典型配置:
process.working-dir:/data/flink/workdirstate.backend.local-recovery:truetaskmanager.resource-id:TaskManager_1一句话:本地恢复要“同一个 TM 身份 + 同一块盘”。
6. Docker:把 Standalone 变成“可复现的容器化 Standalone”
6.1 Docker Session(最快跑起来)
思路:先建网络、设jobmanager.rpc.address,再拉起 JM/TM。
FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"dockernetwork create flink-networkdockerrun --rm --name=jobmanager --network flink-network -p8081:8081\--envFLINK_PROPERTIES="${FLINK_PROPERTIES}"flink:2.2.0-scala_2.12 jobmanagerdockerrun --rm --name=taskmanager --network flink-network\--envFLINK_PROPERTIES="${FLINK_PROPERTIES}"flink:2.2.0-scala_2.12 taskmanager6.2 Docker Compose:把“集群拓扑”固化成一份 yaml
你贴的 compose 模板很标准:JM 暴露 8081,TM 按需 scale。
如果要 SQL Client 同容器使用,记得连接器 jar 必须在集群和 client都可见(通常做自定义镜像)。
6.3 Docker Application:更接近生产的隔离模式
要点只有一个:作业 jar 必须在容器 classpath / usrlib 可见
三种做法你贴得很全:挂载 volume、构建自定义镜像、或--jars指向远端(S3/HTTP)。
7. Kubernetes 部署:Standalone on K8s vs Native K8s
很多人第一次上 K8s 会把两者混在一起:
- Standalone on K8s:本质仍是 Standalone,只是用 Deployment/Service 把 JM/TM 跑在 K8s 上
- Native Kubernetes:Flink直接调用 K8s API动态申请/释放 TaskManager,更像真正的“资源提供方集成”
下面重点讲你最后贴的 Native Kubernetes。
8. Native Kubernetes:Flink 直连 K8s API 的“正统玩法”
8.1 Session 模式:一套长期集群跑多作业
启动 Session:
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster提交作业:
./bin/flink run\--target kubernetes-session\-Dkubernetes.cluster-id=my-first-flink-cluster\./examples/streaming/TopSpeedWindowing.jar停止:直接删 deployment
kubectl delete deployment/my-first-flink-cluster(nightlies.apache.org)
补充:Session 还有 detached/attached 两种控制方式;attached 可以在脚本里输入stop/help做交互管理。(nightlies.apache.org)
8.2 Application 模式:生产更推荐的隔离方式
Application 模式的关键约束:用户代码要能被集群侧拿到(因为 main() 在集群里跑)。
两条主路:
1)自定义镜像把 jar 打进usrlib/
2)开启 Artifact Management:本地 jar 上传到远端,再由 JM pod 拉取(也可以混合本地/远端 artifact-list)(nightlies.apache.org)
你还能用./bin/flink list/cancel --target kubernetes-application去管理集群侧作业。(nightlies.apache.org)
8.3 Web UI 暴露方式:ClusterIP / NodePort / LoadBalancer
kubernetes.rest-service.exposed.type支持:
- ClusterIP:集群内访问,通常配
kubectl port-forward - NodePort:节点 IP:NodePort
- LoadBalancer:云厂商 LB
注意文档里的安全提醒:如果你把 REST/UI 暴露到公网,通常意味着“可执行任意用户代码”的风险。(nightlies.apache.org)
8.4 日志与排障:别只盯 UI
- 直接看 pod 日志:
kubectl logs <pod> - TM 会自动回收空闲实例,想留足排障时间可调大
resourcemanager.taskmanager-timeout(nightlies.apache.org)
8.5 Plugins、Secrets、RBAC、Pod Template:上生产绕不开
- Plugins:通过环境变量把内置插件(例如 S3)挂进 JM/TM (nightlies.apache.org)
- Secrets:既可按文件挂载,也可注入环境变量 (nightlies.apache.org)
- HA:把 JM replicas 调大只是“更快重启”,真正 HA 还要启用 HA 配置(文档同页有说明)(nightlies.apache.org)
- RBAC:默认 service account 可能没权限创建/删除 pod,需要 role binding (nightlies.apache.org)
- Pod Template:用
kubernetes.pod-template-file.default支持更复杂的 pod spec,但要知道哪些字段会被 Flink 覆盖 (nightlies.apache.org)
9. Hive Functions:把 Hive 的函数生态“搬”到 Flink SQL
9.1 HiveModule:Hive 内置函数变成 Flink 系统函数
加载方式(Java):
Stringname="myhive";Stringversion="2.3.4";tableEnv.loadModule(name,newHiveModule(version));注意:老版本 Hive 内置函数有线程安全问题,文档建议自行打补丁。
9.2 原生 Hive 聚合:性能开关(Flink 1.17+)
如果 HiveModule 优先级高于 CoreModule,默认会优先用 Hive 聚合,但目前只能走 sort-based 聚合。
开启table.exec.hive.native-agg-function.enabled=true后,sum/count/avg/min/max这 5 个可以走 hash-based 聚合,通常对性能提升很明显。
经验建议:
- 性能不是瓶颈就别开(能力不完全对齐 Hive)
- SQL Client 场景下目前不能按 job 单独开,只能模块级别先开再 load module(文档也说未来会修)
9.3 复用 Hive UDF/UDAF/UDTF
前置条件你贴得很明确:
- 当前 session 的 catalog 要指向含该函数的 HiveCatalog(连 Hive Metastore)
- 函数 jar 要在 Flink classpath
Flink 会在规划/执行阶段把 Hive 的 UDF/GenericUDF/UDTF/UDAF 自动翻译成 Flink 的 ScalarFunction/TableFunction/AggregateFunction 来跑。
10. Flink SQL 调 OpenAI:把推理当作一等公民接入流批任务
你贴的 “OpenAI Model Function” 思路很像一个可配置的远程模型算子:在 SQL 里CREATE MODEL,再用ML_PREDICT批量推理。
10.1 情感分类示例(Chat 类任务)
CREATEMODEL ai_analyze_sentiment INPUT(`input`STRING)OUTPUT(`content`STRING)WITH('provider'='openai','endpoint'='https://api.openai.com/v1/chat/completions','api-key'='<YOUR KEY>','model'='gpt-3.5-turbo','system-prompt'='Classify ... Output only the label.');然后:
INSERTINTOprint_sinkSELECTid,movie_name,contentaspredicit_label,actual_labelFROMML_PREDICT(TABLEmovie_comment,MODEL ai_analyze_sentiment,DESCRIPTOR(user_comment));工程提醒(很关键):
- API Key 别硬编码在 SQL 文件里,生产建议走 K8s Secret/环境变量注入,再由 Flink 配置引用(跟上一节 K8s secrets 串起来)
- 成本与吞吐:
n记得保持 1;max-tokens、max-context-size控制预算;批量推理要盯住并发与重试策略
10.2 错误处理与上下文溢出:先定策略再跑生产
你贴的 options 里最影响“作业稳定性”的是两类:
- context-overflow-action:截断头/尾、跳过并记录
- error-handling-strategy + retry:RETRY/FAILOVER/IGNORE + 重试次数与兜底策略
强烈建议:流任务默认别轻易 FAILOVER,把“可恢复错误”做成 IGNORE+日志/指标,避免外部 API 抖动导致作业雪崩。
10.3 Embeddings:向量输出的 schema 要对齐
Embeddings 输出是ARRAY<FLOAT>,在下游你通常会:
- 写入向量库/ES 向量字段
- 或在 Flink 里做相似度计算(注意状态大小与算子开销)
OpenAI 官方 API 参考里仍保留 Embeddings 接口(可选dimensions等参数)。(OpenAI 平台)
另外,OpenAI 也提供了更统一的 Responses API(不少新能力会往这里收敛),如果你后续要升级端点,可以优先对齐 Responses 的语义与返回结构。(OpenAI 平台)
11. 最后给一份“上线前检查清单”
选型
- 多作业共享:Session
- 强隔离/关键链路:Application
Java
- 推荐 Java 17
- Java 16+ 模块化 opens/export 配好且不删默认项
状态与恢复
- RocksDB + 本地恢复要固定 TM resource-id + 持久化 working-dir
K8s 安全
- REST/UI 暴露不要裸奔公网
- RBAC、Secrets、插件路径一次性打通
Hive/AI 扩展
- HiveModule 优先级与 native agg 开关要评估一致性
- OpenAI 推理要把重试/忽略/截断策略定好,指标打好