PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

1. PyFlink 为什么要手动指定 Connector/Format JAR?

因为:

  • Flink 核心运行时在 JVM 上
  • connector(如 kafka)和 format(如 json)都是 JVM 侧实现
  • Python 代码只是驱动 Table/SQL 的规划与提交

所以你需要通过pipeline.jars指定依赖(多个 jar 用;分隔):

table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

实战建议:

  • connector jar 和 format jar 都要带上(例如 Kafka + JSON)
  • 路径用file:///这种绝对 URI,避免分布式环境找不到文件
  • 生产上更推荐把 jar 放到统一位置(Flink lib 或制品仓)并在提交时声明依赖,pipeline.jars适合快速验证与 demo

2. 在 PyFlink Table API 中,推荐用 DDL 定义 Source/Sink

PyFlink 的 Table API 使用 connector 最推荐的方式是:DDL + execute_sql()
理由很简单:DDL 更直观、更可复制、也最接近线上 SQL Gateway/SQL Client 的使用方式。

2.1 Kafka Source/Sink + JSON Format(最小可用示例)

source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()

关键点拆解:

  • execute_sql()注册表(source/sink)
  • sql_query()产出一个 Table
  • execute_insert()触发写入(并提交作业)
  • .wait()在本地/mini cluster 场景常用,用于等待作业执行(远程集群通常不建议一直 wait)

3. 完整可运行的 Python 结构(把 jar、DDL、DML 串起来)

你给的完整示例结构非常标准,我建议你在博客里也用这种方式组织代码:

frompyflink.tableimportTableEnvironment,EnvironmentSettingsdeflog_processing():env_settings=EnvironmentSettings.in_streaming_mode()t_env=TableEnvironment.create(env_settings)# 1) 指定 connector & format jarst_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")# 2) DDL: source/sinksource_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)# 3) DML: query + insertt_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table")\.wait()if__name__=='__main__':log_processing()

4. PyFlink 里“内置”的 Sources/Sinks:不用额外 jar 也能跑

除了 Kafka 这类外部 connector,Flink 也提供了一些“开箱即用”的数据源/数据汇,特别适合本地调试与单测。

4.1 from/to Pandas(非常适合快速验证)

frompyflink.table.expressionsimportcolimportpandasaspdimportnumpyasnp pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf,["a","b"]).filter(col('a')>0.5)pdf2=table.to_pandas()

注意:to_pandas()会把结果收集到客户端内存,生产慎用,建议先limit()

4.2 from_elements():用 Python 集合直接造表

frompyflink.tableimportDataTypes# 自动推断table_env.from_elements([(1,'Hi'),(2,'Hello')])# 指定字段名table_env.from_elements([(1,'Hi'),(2,'Hello')],['a','b'])# 指定 schema(更稳)table_env.from_elements([(1,'Hi'),(2,'Hello')],DataTypes.ROW([DataTypes.FIELD("a",DataTypes.INT()),DataTypes.FIELD("b",DataTypes.STRING())]))

这类内置 source 对写教程、做 POC、复现 bug 特别省事。

5. 自定义 Sources & Sinks:Python 不能直接写,需 Java/Scala 实现

文档明确说明了现阶段的边界:

  • 自定义 source/sink 需要 Java/Scala 实现
  • Python 侧可以通过实现 TableFactory(也是 Java/Scala)让它能被 DDL 发现并使用

也就是说:你可以用 PyFlink 写作业逻辑,但 connector 生态仍然是 JVM 的。

如果你后面要写“自定义 connector”系列博客,可以按这个路线写:

  • 先用 Java 写 DynamicTableSourceFactory / DynamicTableSinkFactory(SPI 注册)
  • 再在 PyFlink 里通过 DDL'connector'='xxx'直接使用

6. 常见踩坑清单(PyFlink Connector 场景高频问题)

  • 只加了 connector jar,没加 format jar:DDL 里用了'format'='json',但没带 json format 的 jar,会在运行期报找不到 format factory
  • pipeline.jars 路径不可达:本地 file 路径对集群 TaskManager 不可见,必须用集群可访问路径或随 job 提交
  • 用 DDL 建表但没触发执行:Table/SQL 是惰性执行,必须execute_insert()execute_sql(INSERT ...)才会提交作业
  • wait() 用错场景:本地调试很方便;远程集群提交通常希望异步返回,避免客户端阻塞

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1134386.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI+FFMPEG:用自然语言生成视频处理脚本

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个AI辅助的FFMPEG命令生成器,用户可以通过自然语言描述视频处理需求(如将视频压缩到10MB以内、提取前30秒并添加水印),系统自…

系统提示找不到d3dx9_43.dll文件问题 免费下载方法分享

在使用电脑系统时经常会出现丢失找不到某些文件的情况,由于很多常用软件都是采用 Microsoft Visual Studio 编写的,所以这类软件的运行需要依赖微软Visual C运行库,比如像 QQ、迅雷、Adobe 软件等等,如果没有安装VC运行库或者安装…

教学实践:如何在计算机课程中使用Llama Factory开展大模型实验

教学实践:如何在计算机课程中使用Llama Factory开展大模型实验 大模型技术正在改变计算机教育的面貌,但对于大学讲师来说,如何让学生在设备性能参差不齐的情况下统一参与实践环节是个难题。本文将介绍如何利用Llama Factory这一开源工具&…

用Llama Factory实现多模态微调:图文结合的新可能

用Llama Factory实现多模态微调:图文结合的新可能 作为一名内容创作者,你是否遇到过这样的困境:现有的AI工具要么只能生成文字,要么只能处理图片,而无法真正理解图文之间的关联?这正是我最近面临的挑战。幸…

模型压缩:使用Llama Factory将大模型瘦身90%的实用技巧

模型压缩:使用Llama Factory将大模型瘦身90%的实用技巧 作为一名移动端开发者,你是否遇到过这样的困境:好不容易训练出一个15GB的大模型,却发现它根本无法在移动设备上运行?别担心,今天我就来分享一个实测…

零基础玩转GD32:EMBEDDED BUILDER入门指南

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个最简单的GD32开发板入门项目:实现板载LED的呼吸灯效果,并通过串口接收命令改变呼吸频率。要求生成完整的工程文件,包括系统时钟配置、G…

AI如何加速AARCH64架构下的开发流程

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个AI辅助工具,帮助开发者在AARCH64架构下优化代码性能。工具应能分析现有代码,识别性能瓶颈,并提供针对AARCH64架构的优化建议。支持C/C和…

Llama Factory全自动:设置好参数就让模型夜间自动训练完成

Llama Factory全自动:设置好参数就让模型夜间自动训练完成 为什么需要夜间自动训练? 作为一名开发者,白天的时间往往被会议、代码评审和其他工作占据。但模型训练又需要大量计算资源,特别是使用大语言模型时。Llama Factory 提供了…

多情感语音合成PK:Sambert-Hifigan支持喜怒哀乐语调调节实测

多情感语音合成PK:Sambert-Hifigan支持喜怒哀乐语调调节实测 引言:中文多情感语音合成的现实需求 在智能客服、有声阅读、虚拟主播等应用场景中,传统语音合成(TTS)系统往往只能输出“机械式”的平缓语调,缺…

零基础入门:10分钟用VueDraggable创建可拖拽列表

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个最简单的VueDraggable入门示例,要求:1. 包含5个可拖拽的彩色卡片 2. 每个卡片显示不同emoji图标 3. 拖拽时卡片半透明效果 4. 底部显示当前排序结果…

儿童教育产品集成案例:识字APP接入TTS实现发音指导

儿童教育产品集成案例:识字APP接入TTS实现发音指导 📌 背景与挑战:儿童识字场景中的语音需求 在儿童教育类应用中,准确、自然且富有情感的语音反馈是提升学习体验的关键。传统的机械式朗读音效难以吸引低龄用户注意力,…

二次开发:基于Llama Factory源码定制专属模型训练平台

二次开发:基于Llama Factory源码定制专属模型训练平台 为什么选择Llama Factory进行二次开发 Llama Factory作为开源的大模型训练与微调框架,已经成为许多科技公司构建内部AI平台的首选基础。它集成了从预训练到指令微调、强化学习等完整流程&#xff0c…

NanoPi R5S OpenWrt固件终极优化:实测千兆网络性能爆发指南

NanoPi R5S OpenWrt固件终极优化:实测千兆网络性能爆发指南 【免费下载链接】nanopi-openwrt Openwrt for Nanopi R1S R2S R4S R5S 香橙派 R1 Plus 固件编译 纯净版与大杂烩 项目地址: https://gitcode.com/GitHub_Trending/nan/nanopi-openwrt 网络瓶颈诊断…

AList终极指南:3步打造你的智能文件管理中心

AList终极指南:3步打造你的智能文件管理中心 【免费下载链接】alist 项目地址: https://gitcode.com/gh_mirrors/alis/alist 你是否曾经为管理多个云盘账户而烦恼?在阿里云盘、百度网盘、Google Drive等不同服务之间切换,不仅效率低下…

Android开发新手必看:ADB Daemon错误完全指南

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 创建一个交互式学习应用,帮助新手理解并解决ADB相关问题。功能包括:1) ADB基础知识讲解;2) 常见错误模拟环境;3) 分步骤解决方案演示…

OCR技术对比:CRNN在不同场景下的表现

OCR技术对比:CRNN在不同场景下的表现 📖 项目背景与OCR技术演进 光学字符识别(Optical Character Recognition, OCR)是人工智能领域中一项基础而关键的技术,广泛应用于文档数字化、票据识别、车牌检测、自然场景文字…

用APOLLO快速构建微服务配置原型系统

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个微服务配置管理原型系统,功能包括:1. 服务注册发现配置;2. 动态路由规则管理;3. 熔断降级策略配置;4. 灰度发布…

如何用AI快速生成MC.JS1.8.8的插件代码?

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 请基于MC.JS1.8.8开发一个Minecraft插件,功能包括:1) 玩家加入服务器时发送欢迎消息;2) 击杀怪物后获得随机奖励;3) 自定义/hello命…

Sambert-Hifigan语音合成实战:3步部署中文多情感TTS服务

Sambert-Hifigan语音合成实战:3步部署中文多情感TTS服务 引言:让机器“有感情”地说话——中文多情感TTS的现实需求 在智能客服、有声阅读、虚拟主播等应用场景中,传统的语音合成(Text-to-Speech, TTS)系统往往输出机械…

AI有声书制作全流程:Sambert-Hifigan实现长文本自动分段合成

AI有声书制作全流程:Sambert-Hifigan实现长文本自动分段合成 📌 引言:中文多情感语音合成的现实需求 随着数字内容消费的持续增长,有声书、播客、智能朗读等音频服务正成为信息获取的重要方式。传统人工配音成本高、周期长&#x…