PyFlink使用教程,Flink,Python,Java

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开 Anaconda3 Prompt

> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)> python --version
Python 3.8.8> conda env list
# conda environments:
#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4> conda create -n PyFlink-1.17.1 python==3.8.8> conda activate PyFlink-1.17.1> python -m pip install apache-flink==1.17.1> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().set("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout,level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

要在PyFlink-1.17.1环境下运行

> python tableAPIJob.pyUsing Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre> vi /etc/profileexport JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH> source /etc/profile > ls -l /usr/bin/python*lrwxrwxrwx 1 root root       9 Mar 26  2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 26  2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  4  2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  4  2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  2  2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 22  2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  3  2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  3  2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 26  2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 26  2019 /usr/bin/python3m-config -> python3.7m-config> python --versionCommand 'python' not found, but can be installed with:apt install python3         # version 3.7.3-1, or
apt install python          # version 2.7.16-1
apt install python-minimal  # version 2.7.16-1You also have python3 installed, you can run 'python3' instead.> python3 --version
Python 3.7.3# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python> python --version
Python 3.7.3# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple# 启动 Flink-1.17.1
> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink 库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的.wait()调用删掉

tab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.pyExecuting word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656205.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx 1.25配置QUIC和HTTP3

Nginx 1.25配置QUIC和HTTP/3 Nginx在编译时需要配置相应的SSL库,以确保能够支持HTTP3.0和HTTP2.0等基于HTTPS的协议。这些加密算法主要由OpenSSL提供。另外,BoringSSL是谷歌创建的OpenSSL分支,专门用于支持TLS 1.3的UDP协议的0-RTT数据传输加…

2024 年人工智能(AI)会有哪些新趋势和新突破?无际Ai分享

随着科技的不断发展,人工智能领域正在以惊人的速度迈向前所未有的高度。在 2024 年,我们可以期待看到多个领域的重大突破,这将为人工智能技术带来新的应用和可能性。 以下是一些可能出现的新领域和突破性进展: 强化学习应用拓展&…

python中yield的用法

yield(生成器) v. 产生(收益、效益等),产生(结果);出产(天然产品,农产品,工业产品);屈服,让步;放弃,让出&…

MySQL前百分之N问题--percent_rank()函数

PERCENT_RANK()函数 PERCENT_RANK()函数用于将每行按照(rank - 1) / (rows - 1)进行计算,用以求MySQL中前百分之N问题。其中,rank为RANK()函数产生的序号,rows为当前窗口的记录总行数 PERCENT_RANK()函数返回介于 0 和 1 之间的小数值 selectstudent_…

MicrosoftEdge浏览器打开网页出现“此网站被人举报不安全”问题时解决办法

1:有时候不知怎么回事用电脑自带的微软浏览器进行搜索会出现以下的问题 这可能是由于我们的浏览器安全审查过于严格引起的 Windows10正式版系统下,使用Edge浏览器浏览网页时候,发现整个页面突然变成了红色,显示“已有人举报此网站…

【高效开发工具系列】markdown转HTML

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

打击者H5小游戏

欢迎来到程序小院 打击者 玩法&#xff1a;点击飞机上下左右移动躲过子弹射击&#xff0c;打掉上方敌人飞机&#xff0c; 遇到药包会增加能量&#xff0c;弹药包会升级武器&#xff0c;快去射击吧^^。开始游戏https://www.ormcc.com/play/gameStart/262 html <div id"…

java8 Duration类学习

Duration类 官网地址 基于时间的时间量&#xff0c;例如“34.5秒”。 此类以秒和纳秒为单位对时间的量或量进行建模。它可以使用其他基于持续时间的单位访问&#xff0c;如分钟和小时。此外&#xff0c;可以使用DAYS单位&#xff0c;并将其视为完全等于24小时&#xff0c;从…

C语言第十三弹---VS使用调试技巧

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 VS调试技巧 1、什么是bug 2、什么是调试&#xff08;debug&#xff09;&#xff1f; 3、Debug和Release​编辑​ 4、VS调试快捷键 4.1、环境准备 4.2、调试…

python--文件读取与写入

"""1、文件的读取(1)读取文件&#xff1a;open1、file&#xff1a;指定文件的路径路径可以分成相对路径和绝对路径2、mode&#xff1a;指定文件的读取的的模式&#xff1a;1、r open for reading (default) 表示的是读模式2、w open for writing, t…

计算机毕业设计 | SpringBoot+vue学生成绩管理系统教务管理系统

1&#xff0c;项目背景 随着我国高等教育的发展&#xff0c;数字化校园将成为一种必然的趋势&#xff0c;国内高校迫切需要提高教育工作的质量与效率&#xff0c;学生成绩管理工作是高校信息管理工作的重要组成部分&#xff0c;与国外高校不同&#xff0c;他们一般具有较大规模…

快来建服组队一起捕捉帕鲁

2024年初最火的steam游戏《幻兽帕鲁》&#xff0c;大家都已经玩上了吧&#xff1f; 如何跟朋友组队一起在广阔的世界中捕捉神奇的生物“帕鲁”&#xff0c;快来金山云解锁吧~ 第一步&#xff1a;创建游戏服务器 部署一台幻兽帕鲁云服务器&#xff1a;在控制台上选择离您更近…

在Windows11的WSL上运行Llama2-7b-chat 下

上一篇博客讲了我跑Llama的demo的心路历程&#xff08;上一篇博客传送门&#xff09;&#xff0c;这篇我们主要是讲下怎么配置。 快速开始 使用Linux、Linux、Linux&#xff0c;重要的事情说三遍&#xff0c;如果你和我一样懒得安装双系统&#xff0c;那么在Windows下安装一个…

【Pytorch 第三讲】如何使用pre-trained weights 来训练自己的模型

# 理由# 有时在训练自己的模型时&#xff0c;如果从头开始训练自己的模型&#xff0c; 不但费时费力&#xff0c; 有时可能训练了很久&#xff0c; 好不容易收敛&#xff0c; 发现结果不是太好。 如果能够基于被人 已经训练好的权重&#xff0c; 初始化自己的模型。那么在训练…

二百二十一、HiveSQL报错:return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask

一、目的 在运行HiveSQL时&#xff0c;执行报错 tatement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask 二、在yarn上查看任务报错 The required MAP capability is more than the supported max container capability in t…

synchronized 和 ReentrantLock 的区别

synchronized 和 ReentrantLock 都是 Java 中的同步机制&#xff0c;用于确保在多线程环境下&#xff0c;同一时刻只有一个线程能够访问特定的代码块、方法或对象。它们的区别在于实现方式和性能上。 synchronized、ReentrantLock synchronized示例 ReentrantLock可重入公平性示…

使用流服务器m7s对接gb28181

优:sip品牌兼容性比较好,大华,海康都稳定可以,srs的5.0 sip品牌兼容性大华没反应,akstream-sip 大华也有问题,wvp也还可以 缺:目前最新的4.7.4版本,,sip协议用udp正常,TCP不正常(估计不支持),移动、事件,预警不支持 一、下载对应的m7s的执行文件 官网:快速起步…

Android开发中自定义View实现RecyclerView下划线

本篇文章主要讲解的是有关RecyclerView下划线的使用&#xff0c;主要有几个方法&#xff0c;具体如下&#xff1a; 第一种方式&#xff1a;网格分割线 public class GridDivider extends RecyclerView.ItemDecoration { private Drawable mDividerDarwable; private i…

深入理解c语言printf

printf(格式控制&#xff0c;输出列表); printf的使用方法大全 #include<stdio.h> int main(){double a123.111;int m-156;char csA;char *str"%d,%d";printf("%d",100,1);return 0; }

TextCNN的复现

TextCNN的复现–pytorch的实现 对于TextCNN的讲解&#xff0c;可以参考这篇文章 Convolutional Neural Networks for Sentence Classification - 知乎 (zhihu.com) 接下来主要是对代码内容的详解&#xff0c;完整代码将在文章末尾给出。 使用的数据集为电影评论数据集&…