flink operator v1.10对接华为云对象存储OBS

1 概述

flink operator及其flink集群,默认不直接支持华为云OBS,需要在这些java程序的插件目录放一个jar包,以及修改flink配置后,才能支持集成华为云OBS。
相关链接参考:

https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html

2 环境准备

2.1 华为云kubernetes集群

准备一个kubernetes集群,如下图所示:
在这里插入图片描述

2.2 flink operator helm包下载地址

https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz

2.3 cert-manager yaml文件下载地址

https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml

2.4 准备flink应用示例

https://github.com/apache/flink/tree/master/flink-examples

将flink官方示例的代码编译成jar包,再上传到对象存储OBS,如下图所示:
在这里插入图片描述
这些jar包存放在华为云OBS对象存储上,flink operator和可以通过OBS协议拉取jar包,最终提交给flink集群,并且flink集群的jobmanager、flink taskmanager也能读写OBS对象存储。

3 部署

3.1 安装cert-manager

此组件是flink operator webhook的一个依赖,因此先安装它。

cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml

在这里插入图片描述

3.2 安装helm二进制工具

cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env

3.3 部署flink operator

下载fink operator的helm包,解压文件,最后通过helm命令将它部署在flink namespace中。

cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz

修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下内容:

defaultConfiguration:flink-conf.yaml: |+fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com     # 这是对象存储端点,依据实际情况填写

部署k8s资源,命令如下:

helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/

我将flink-obs的jar包放入到镜像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此镜像是公共镜像,大家可随意拉取使用。

接着,更新operator deployment(需要使用initContainer和obs-plugin的volume的挂载),直接kubectl apply如下内容即可:

apiVersion: apps/v1
kind: Deployment
metadata:annotations:meta.helm.sh/release-name: flink-operatormeta.helm.sh/release-namespace: flinkgeneration: 4labels:app.kubernetes.io/managed-by: Helmapp.kubernetes.io/name: flink-kubernetes-operatorapp.kubernetes.io/version: 1.10.0helm.sh/chart: flink-kubernetes-operator-1.10.0name: flink-kubernetes-operatornamespace: flink
spec:replicas: 1selector:matchLabels:app.kubernetes.io/name: flink-kubernetes-operatorstrategy:type: Recreatetemplate:metadata:annotations:kubectl.kubernetes.io/default-container: flink-kubernetes-operatorcreationTimestamp: nulllabels:app.kubernetes.io/name: flink-kubernetes-operatorspec:initContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopcontainers:- command:- /docker-entrypoint.sh- operatorenv:- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespace- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.name- name: OPERATOR_NAMEvalue: flink-kubernetes-operator- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGSimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentlivenessProbe:failureThreshold: 3httpGet:path: /port: health-portscheme: HTTPinitialDelaySeconds: 30periodSeconds: 10successThreshold: 1timeoutSeconds: 1name: flink-kubernetes-operatorports:- containerPort: 8085name: health-portprotocol: TCPresources: {}securityContext: {}startupProbe:failureThreshold: 30httpGet:path: /port: health-portscheme: HTTPperiodSeconds: 10successThreshold: 1timeoutSeconds: 1terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /opt/flink/confname: flink-operator-config-volume- mountPath: /opt/flink/artifactsname: flink-artifacts-volume- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop- command:- /docker-entrypoint.sh- webhookenv:- name: WEBHOOK_KEYSTORE_PASSWORDvalueFrom:secretKeyRef:key: passwordname: flink-operator-webhook-secret- name: WEBHOOK_KEYSTORE_FILEvalue: /certs/keystore.p12- name: WEBHOOK_KEYSTORE_TYPEvalue: pkcs12- name: WEBHOOK_SERVER_PORTvalue: "9443"- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGS- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespaceimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentname: flink-webhookresources: {}securityContext: {}terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /certsname: keystorereadOnly: true- mountPath: /opt/flink/confname: flink-operator-config-volumednsPolicy: ClusterFirstrestartPolicy: AlwaysschedulerName: default-schedulersecurityContext:runAsGroup: 9999runAsUser: 9999serviceAccount: flink-operatorserviceAccountName: flink-operatorterminationGracePeriodSeconds: 30volumes:- configMap:defaultMode: 420items:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-operator.propertiespath: log4j-operator.properties- key: log4j-console.propertiespath: log4j-console.propertiesname: flink-operator-configname: flink-operator-config-volume- emptyDir: {}name: flink-artifacts-volume- name: keystoresecret:defaultMode: 420items:- key: keystore.p12path: keystore.p12secretName: webhook-server-cert- name: obs-pluginemptyDir: {}

3.4 部署flink session cluster

kubectl apply以下资源即可部署一个flink session集群,文件内容如下:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:name: flink-session-clusternamespace: flink
spec:image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19flinkVersion: v1_19flinkConfiguration:fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com   # 这是对象存储端点,依据实际情况填写jobManager:resource:memory: "2048m"cpu: 2taskManager:resource:memory: "2048m"cpu: 2serviceAccount: flinkpodTemplate:spec:volumes:- name: obs-pluginemptyDir: {}containers:# Do not change the main container name- name: flink-main-containervolumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopinitContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop

在这里插入图片描述

4 提交flink作业

kubectl apply以下资源即可:

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:name: basic-session-job-examplenamespace: flink
spec:deploymentName: flink-session-clusterjob:jarURI: obs://你的桶/StateMachineExample.jar    # jar包的位置,按实际情况填写parallelism: 1

在这里插入图片描述
可见flink作业是running状态,说明jar包被flink operator从华为云对象存储OBS拉取下来并提交到flink集群中。
继续查看flink operator日志,可以看见obs相关的信息:
在这里插入图片描述

小结

本文介绍flink operator及其管理的flink集群是如何对接到华为云对象存储OBS,对接完成后,不仅可以将作业的jar包存储在对象存储,也可以将flink作业的状态、输入输出等存储在对象存储。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71834.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

免费PDF工具

Smallpdf.com - A Free Solution to all your PDF Problems Smallpdf - the platform that makes it super easy to convert and edit all your PDF files. Solving all your PDF problems in one place - and yes, free. https://smallpdf.com/#rappSmallpdf.com-解决您所有PD…

去中心化技术P2P框架

中心化网络与去中心化网络 1. 中心化网络 在传统的中心化网络中,所有客户端都通过一个中心服务器进行通信。这种网络拓扑结构通常是一个星型结构,其中服务器作为中心节点,每个客户端只能与服务器通信。如果客户端之间需要通信,必须…

muduo源码阅读:linux timefd定时器

⭐timerfd timerfd 是Linux一个定时器接口,它基于文件描述符工作,并通过该文件描述符的可读事件进行超时通知。可以方便地与select、poll和epoll等I/O多路复用机制集成,从而在没有处理事件时阻塞程序执行,实现高效的零轮询编程模…

Pinia 3.0 正式发布:全面拥抱 Vue 3 生态,升级指南与实战教程

一、重大版本更新解析 2024年2月11日,Vue 官方推荐的状态管理库 Pinia 迎来 3.0 正式版发布,本次更新标志着其全面转向 Vue 3 技术生态。以下是开发者需要重点关注的升级要点: 1.1 核心变更说明 特性3.0 版本要求兼容性说明Vue 支持Vue 3.…

【图像处理 --- Sobel 边缘检测的详解】

Sobel 边缘检测的详解 目录 Sobel 边缘检测的详解1. 梯度计算2. 梯度大小3. 梯度方向4. 非极大值抑制5. 双阈值处理6. 在 MATLAB 中实现 Sobel 边缘检测7.运行结果展示8.关键参数解释9.实验与验证 Sobel 边缘检测是一种经典的图像处理算法,用于检测图像中的边缘。它…

LeetCode 热题100 15. 三数之和

LeetCode 热题100 | 15. 三数之和 大家好,今天我们来解决一道经典的算法题——三数之和。这道题在 LeetCode 上被标记为中等难度,要求我们从一个整数数组中找到所有不重复的三元组,使得三元组的和为 0。下面我将详细讲解解题思路&#xff0c…

基因组组装中的术语1——from HGP

Initial sequencing and analysis of the human genome | Nature 1,分层鸟枪法测序hierarchical shotgun sequencing

安全开发-环境选择

文章目录 个人心得虚拟机选择ubuntu 22.04python环境选择conda下载使用: 个人心得 在做开发时配置一个专门的环境可以使我们在开发中的效率显著提升,可以避免掉很多环境冲突的报错。尤其是python各种版本冲突,还有做渗透工具不要选择windows…

数字体验驱动用户参与增效路径

内容概要 在数字化转型深化的当下,数字内容体验已成为企业与用户建立深度连接的核心切入点。通过个性化推荐引擎与智能数据分析系统的协同运作,企业能够实时捕捉用户行为轨迹,构建精准的用户行为深度洞察模型。这一模型不仅支撑内容分发的动…

Python 字符串(str)全方位剖析:从基础入门、方法详解到跨语言对比与知识拓展

Python 字符串(str)全方位剖析:从基础入门、方法详解到跨语言对比与知识拓展 本文将深入探讨 Python 中字符串(str)的相关知识,涵盖字符串的定义、创建、基本操作、格式化等内容。同时,会将 Py…

使用C++实现简单的TCP服务器和客户端

使用C实现简单的TCP服务器和客户端 介绍准备工作1. TCP服务器实现代码结构解释 2. TCP客户端实现代码结构解释 3. 测试1.编译:2.运行 结语 介绍 本文将通过一个简单的例子,介绍如何使用C实现一个基本的TCP服务器和客户端。这个例子展示了如何创建服务器…

Java Web开发实战与项目——Spring Boot与Spring Cloud微服务项目实战

企业级应用中,微服务架构已经成为一种常见的开发模式。Spring Boot与Spring Cloud提供了丰富的工具和组件,帮助开发者快速构建、管理和扩展微服务应用。本文将通过一个实际的微服务项目,展示如何使用Spring Boot与Spring Cloud构建微服务架构…

VMware建立linux虚拟机

本文适用于初学者,帮助初学者学习如何创建虚拟机,了解在创建过程中各个选项的含义。 环境如下: CentOS版本: CentOS 7.9(2009) 软件: VMware Workstation 17 Pro 17.5.0 build-22583795 1.配…

Linux8-互斥锁、信号量

一、前情回顾 void perror(const char *s);功能:参数: 二、资源竞争 1.多线程访问临界资源时存在资源竞争(存在资源竞争、造成数据错乱) 临界资源:多个线程可以同时操作的资源空间(全局变量、共享内存&a…

LD_PRELOAD 绕过 disable_function 学习

借助这位师傅的文章来学习通过LD_PRELOAD来绕过disable_function的原理 【PHP绕过】LD_PRELOAD bypass disable_functions_phpid绕过-CSDN博客 感谢这位师傅的贡献 介绍 静态链接: (1)举个情景来帮助理解: 假设你要搬家&#x…

【无人集群系列---无人机集群编队算法】

【无人集群系列---无人机集群编队算法】 一、核心目标二、主流编队控制方法1. 领航-跟随法(Leader-Follower)2. 虚拟结构法(Virtual Structure)3. 行为法(Behavior-Based)4. 人工势场法(Artific…

Oracle Fusion Middleware更改weblogic密码

前言 当用户忘记weblogic密码时,且无法登录到web界面中,需要使用服务器命令更改密码 更改方式 1、备份 首先进入 weblogic 安装目录,备份三个文件:boot.properties,DefaultAuthenticatorInit.ldift,Def…

MongoDB 复制(副本集)

MongoDB 复制(副本集) 引言 MongoDB是一个高性能、可扩展、易于使用的文档存储系统。它以JSON-like的文档存储结构,支持灵活的数据模型。在分布式系统中,为了提高数据可用性和系统稳定性,常常需要实现数据的备份和冗余。MongoDB提供了副本集…

【Erdas实验教程】009:非监督分类及分类后评价

文章目录 一、分类过程二、分类评价ERDAS 的 ISODATA 算法是基于最小光谱距离来进行的非监督分类,聚类过程始于任意聚类平均值或一个已有分类模板的平均值;聚类每重复一次,聚类的平均值就更新一次,新聚类的均值再用于下次聚类循环。这个过程不断重复,直到最大的循环次数已…

一周学会Flask3 Python Web开发-Jinja2模板访问对象

锋哥原创的Flask3 Python Web开发 Flask3视频教程: 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 如果渲染模板传的是对象,如果如何来访问呢? 我们看下下面示例: 定义一个Student类 cla…