宿主机运行pyspark任务读取docker hadoop容器上的数据

熬了四个大夜才搞明白,最晚一天熬到早上十点/(ㄒoㄒ)/~~,最后发现只要加个参数就解决了。。。抱头痛哭

问题描述:

Hadoop集群部署在docker容器中,宿主机执行pyspark程序读取hive表

问题一:当master('local[*]')时,docker容器返回给driver端datanode节点的内网ip地址,修改hosts只能将域名转发到ip地址,不能将ip地址转发给ip地址。

问题二:当master('spark://localhost:7077'),因为容器做了端口映射,这里使用的时localhost。driver端为宿主机,spark会把driver端的hostname传到spark集群worker节点上,spark work容器无法识别宿主机hostname

解决方法:

在宿主机配置好hosts,格式为:127.0.0.1 容器hostname(eg:datanode)

问题一:SparkSession加参数config("dfs.client.use.datanode.hostname", "true")//客户端(如 Spark Driver)通过主机名访问 DataNode。

问题二:SparkSession加参数config("spark.driver.host", "192.168.1.5") //宿主机ip地址

就是这么简单。。。哭死(;´༎ຶД༎ຶ`) 

当时试了好多种办法,nginx反向代理、DNSmasq自定义DNS、NPS内网穿透、Macvlan网络模式,SNAT、最后甚至还装了k8s集群外加k8s监控界面,真的哭死。看看现在时间吧,已经4:03了。。。😭😭😭

最后附上完整代码:

import string
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':# =============================== spark local模式 ===============================# 想要读写数据必须要配置windows下的Hadoop环境(不一定,未验证)# 想要使用Hive元数据必须添加enableHiveSupport()这行代码# 无论spark集群配置文件中有没有配置spark元数据信息,都要在代码工程中配置元数据信息,因为本地读取不到集群中的环境变量,创建hive-site.xml文件或代码中定义config都行# 如果不指定spark.sql.warehouse.dir信息则默认为: file:/C:/Users/yelor/spark-warehouse# 如果不知道hive.metastore.uris的值则找不到hive元数据,但不会报错,只是无法使用hive元数据# spark.sql.warehouse.dir和hive.metastore.uris的值可以在代码工程中配置hive-site.xml文件来指定# 容器外需要注意ip地址互通问题,需要配置hosts# 如果Hadoop集群部署在docker容器中,dfs.client.use.datanode.hostname=true在本地local模式下必须要加,不然spark会使用datanode的内网ip来通信import os# 这里可以选择本地win系统的PySpark环境执行pySpark代码,也可以使用虚拟机中PySpark环境,通过os可以配置。# os.environ['SPARK_HOME'] = r'D:\software2\spark-3.1.2-bin-hadoop3.2' # 暂时不写也行PYSPARK_PYTHON = r"D:\AnacondaCache\envs\mspro\python" #python.exe或者简写为python都行# 当存在多个python版本环境时(如使用conda),不指定python环境会报错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON# 配置指定提交作业至HDFS的用户 不然上传文件到 HDFS 的时候会报权限错误 参考文章:https://zhuanlan.zhihu.com/p/538265736# os.environ["HADOOP_USER_NAME"] = "yelor" # 暂时不写也行# 在local模式下,如果想使用hive元数据,以下参数是必须要配置的:spark.sql.warehouse.dir、hive.metastore.urisspark = SparkSession.builder.\appName('udf_define').\master('local[*]').\config('spark.sql.shuffle.partitions', 2).\config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://localhost:9083').\config("spark.executor.memory", "1g").\config("spark.driver.memory", "1g").\config("dfs.client.use.datanode.hostname", "true").\config("spark.driver.host", "192.168.1.5").\enableHiveSupport(). \getOrCreate() # =============================== spark master集群模式 ===============================# 如果要连接spark集群,需要保证pyspark包的版本与集群的spark版本一致# 查看spark版本:spark-submit --version# 就算是使用集群执行作业,也必须要配置hive-site.xml文件中的信息,因为还是读取driver端的环境变量# 必须添加enableHiveSupport()这行代码# spark.driver.host保证代码传到spark容器中时以指定的ip地址为driver地址,不然会使用本机的hostname# import os# os.environ['PYSPARK_PYTHON']=r"D:\\AnacondaCache\\envs\\mspro\\python.exe"# spark = SparkSession.builder.\#     appName('udf_define').\#     master('spark://localhost:7077').\#     config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\#     config('hive.metastore.uris', 'thrift://localhost:9083').\#     config("spark.executor.memory", "512m").\#     config("spark.driver.memory", "512m").\#     config("spark.driver.host", "192.168.1.5").\#     enableHiveSupport(). \#     getOrCreate() # 如果spark配置文件中没有配置spark元数据信息,就不能使用enableHiveSupport().\ 直接在代码中配置元数据信息也能脸上hive元数据sc = spark.sparkContext# 设置日志级别为 DEBUG# sc.setLogLevel("DEBUG")# 查看表的存储位置warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")print(f"Spark SQL warehouse directory: {warehouse_dir}")# 指定要使用的数据库database_name = "tb"spark.sql(f"USE {database_name}")# 执行 SQL 查询# query = "show databases"# query = "SELECT * FROM orders"# query = "SELECT * FROM students"# df = spark.sql(query)# # 显示查询结果# df.show()# 生成 10000 条模拟数据data = []for _ in range(1000):# 生成随机的姓名和年龄name = ''.join(random.choices(string.ascii_letters, k=5))age = random.randint(18, 60)data.append((name, age))# 定义 DataFrame 的列名columns = ["name", "age"]# 创建 DataFramedf = spark.createDataFrame(data, columns)# 创建临时视图# df.createOrReplaceTempView("test_table")try:# 创建持久化表(可选)df.write.saveAsTable("testaa_table", mode="overwrite")# 验证数据插入spark.sql("SELECT * FROM testaa_table LIMIT 5").show()# 加入循环,保持 SparkSession 一直运行,方便看 Spark UIwhile True:try:import timetime.sleep(1)except KeyboardInterrupt:breakexcept Exception as e:print(f"An error occurred: {e}")finally:spark.stop()# 停止 SparkSession# spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/73011.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《平凡的世界》:在泥土中寻找星辰的勇气

“平凡不是平庸的代名词,而是千万人用脊梁扛起时代的勋章。”——路遥的《平凡的世界》用百万字书写了黄土地上孙少安、孙少平两兄弟的挣扎与觉醒,撕开了“奋斗逆袭”的浪漫滤镜,告诉你:真正的英雄主义,是在认清了生活…

【SpringMVC】深入解析使用 Postman 和浏览器模拟将单个与多个参数传递到后端和后端接收过程

SpringMVC—请求(Request) 访问不同的路径,就是发送不同的请求;在发送请求时,可能会带一些参数,所以学习Spring的请求,主要是学习如何传递参数到后端以及后端如何接收; 我们主要是使用 浏览器 和 Postman …

04 | 初始化 fastgo 项目仓库

提示: 所有体系课见专栏:Go 项目开发极速入门实战课;欢迎加入我的训练营:云原生AI实战营,一个助力 Go 开发者在 AI 时代建立技术竞争力的实战营;本节课最终源码位于 fastgo 项目的 feature/s01 分支&#x…

Docker 安装成功后,安装 Dify 中文版本的步骤

Docker 安装成功后,安装 Dify 中文版本的步骤如下1: 克隆 Dify 代码仓库:在终端中执行以下命令,将 Dify 源代码克隆至本地环境。 bash git clone https://github.com/langgenius/dify.git进入 Dify 的 docker 目录: b…

RPC服务调用深度解析:从原理到Java实践

一、RPC的核心原理与架构设计 1.1 RPC的本质 RPC(Remote Procedure Call)是一种分布式系统间通信协议,允许程序像调用本地方法一样调用远程服务。其核心目标是通过位置透明性和协议标准化隐藏网络通信细节。RPC的调用流程可抽象为以下步骤&…

电脑的写字板如何使用?

打开写字板: 直接按一下键盘上的win R 键,然后输入:write , 再按一下回车 , 即可打开写字板 可以在里面写文字 和 插入图片等… , 如下所示: 保存写字板内容: 当我们写好了之后,…

医疗AI测试实战:如何确保人工智能安全赋能医疗行业?

一、医疗AI测试的重要性 人工智能(AI)正广泛应用于医疗行业,如疾病诊断、医学影像分析、药物研发、手术机器人和智能健康管理等领域。医疗AI技术的应用不仅提高了诊断效率,还能降低误诊率,改善患者治疗效果。然而&…

AI日报 - 2025年3月12日

AI日报 - 2025年3月12日 🌟 今日概览(60秒速览) ▎🤖 AGI突破 | Anthropic CEO预测AI将主导代码编写 🔬 自训练技术显著提升LLM思维清晰度 ▎💼 商业动向 | OpenAI与CoreWeave达成119亿美元基建协议 &…

跳表数据结构

跳表(Skip List)是一种支持高效插入、删除和查找的链表结构,用于加速查找操作,特别适用于有序数据集合。它在Redis、LevelDB等系统中被用于**有序集合(Sorted Set)**的实现。 1. 跳表的结构 跳表的核心思…

系统会把原先的对话状态堆栈从 [“assistant“] 更新为 [“assistant“, “update_flight“]这个更新的处理过程

这个更新主要是在 State 定义中通过 Annotated 来自动处理的。在 State 类型中,我们对 dialog_state 字段绑定了 update_dialog_stack 函数,如下所示: class State(TypedDict):messages: Annotated[list[AnyMessage], add_messages]user_inf…

HTTP发送POST请求的两种方式

1、json String json HttpRequest.post(getUrl(method, "v1", url, userId, appKey)).header("Content-type", "application/json") // 设置请求头为 JSON 格式.body(JSONUtil.toJsonStr(params)) // 请求体为 JSON 字符串.execute().body(); …

Windows 万兴恢复专家 Wondershare Recoverit-v13.5.7.9-[电脑数据恢复工具]

Windows 万兴恢复专家Wondershare_Recoverit 链接:https://pan.xunlei.com/s/VOL3z608vzAj_IYTvH-F1q7kA1?pwdiu89# 1. 打开Setup.exe进行安装,安装完不要打开软件,记住安装目录 2. 将"Crack"文件夹内的所有文件复制到安装目录 …

Blender UV纹理贴图,导出FBX到Unity

加载ps好的模型贴图。右下角选择《材质》基础色里面选择《图像纹理》,选择你的图片。 选择上面UV选项卡。左上角选择UV编辑器。选中物体,TAB进入编辑模式。即可调整映射的图像范围。 其中渲染设置可以在左侧下边脱出。 导出带纹理FBX模型 路径选择复…

华为hcia——Datacom实验指南——以太网帧和IPV4数据包格式(一)

实验开始 第一步配置环境 第二步配置客户端 如图所示,我们把客户端的ip配置成192.168.1.10,网关设为192.168.1.1 第三步配置交换机1 system-view sysname LSW1 vlan batch 10 interface ethernet0/0/1 port link-type access port default vlan 10 qu…

解锁 Ryu API:从 Python 接口到 REST 设计全解析

Ryu 4.34 版本的 API 功能分类、核心接口说明及示例代码,结合其 Python 应用开发接口和 REST API 的设计特点进行综合解析: 一、Python 应用开发 API Ryu 的核心能力通过 Python 类库实现,开发者需继承 RyuApp 类并注册事件处理函数。 1. 应…

如何在需求分析阶段考虑未来扩展性

在需求分析阶段考虑未来扩展性的关键在于 前瞻规划、灵活架构、标准设计。其中,前瞻规划尤为重要,因为通过全面分析业务发展趋势与技术演进,能够在初期设计阶段预留足够扩展空间,降低后期改造成本,为企业长期发展奠定坚…

Docker搭建Redis哨兵模式【一主两从三哨兵】

Docker搭建Redis哨兵模式 系统: CentOS 7 Dockder 版本: VMware虚拟机 网络适配器 网络连接 桥接模式:直接连接物理网络查看IP命令 ip addr一、哨兵模式概述 1. 官方文档与关联博客 官方文档:https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel关联博…

关于统计建模大赛的选题

文章目录 0.大赛主题1.量化分析和风险管理2.金融市场预测与统计建模3.投资与机器学习相关4.大数据和医疗5.智能制造相关的6.教育行业 0.大赛主题 统计创新应用数据引领未来:这个主题其实很宽泛,没有什么明确的这个要求,所以只要是和我们的统…

Docker 学习笔记:从入门到部署,实战演练全流程!

📌 开篇:为什么要学 Docker? 还在为环境不一致、部署麻烦、依赖冲突头疼吗?Docker 让一切变得简单!作为现代开发和运维的神器,Docker 让我们可以用 一句命令 解决 “在我电脑上能跑” 的问题。今天&#x…

ThinkPhp 5 安装阿里云内容安全(绿化)

composer require alibabacloud/green-20220302 首先要把php5(不支持php7)的执行文件设置到PATH环境变量 此外还要先执行composer update php5.5和php5.6的区别 5.5认为 <? 开头的也是php文件&#xff0c;包括 <?php 5.6认为 <? 开头的不是php文件&#xff0c;只…