基于spark的Hive2Pg数据同步组件

一、背景

        Hive中的数据需要同步到pg供在线使用,通常sqoop具有数据同步的功能,但是sqoop具有一定的问题,比如对数据的切分碰到数据字段存在异常的情况下,数据字段的空值率高、数据字段重复太多,影响sqoop的分区策略,特别是hash分区,调用hash函数容易使得cpu高产生报警同时sqoop的mapreduce任务对数据表的分割以及数据文件也会有一定的不均衡性。为了弥补这些问题,开发了基于spark的数据同步组件,利用spark处理大数据的强大能力及分布式并行性上的优势,通过执行sparksql将数据写入到pg数据库,但是在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满足特殊场景的需求,尝试利用spark save 源码改进, 批量保存数据,存在则更新不存在则插入。

二、关键设计方案

  1. 方案一

        利用DataFrame框架里自带的df.write.mode(“append”).jdbc(url,pg_table,prop)方法,尝试将df里的每一行row是org.apache.spark.sql.Row类型,结合schema类型转换成DataFrame,方法如下:

import org.apache.spark.sql.types._

  val map = Map("col1" -> 5, "col2" -> 6, "col3" -> 10)

  val (keys, values) = map.toList.sortBy(_._1).unzip

  val rows = spark.sparkContext.parallelize(Seq(Row(values: _*)))

  val schema = StructType(keys.map(

    k => StructField(k, IntegerType, nullable = false)))

  val df = spark.createDataFrame(rows, schema)

  df.show()

经过分析:转DataFrame的过程有重要的两步:首先是通过spark.sparkContext.parallelize将Row类型转成RDD,其次获取schema后利用spark.createDataFrame把RDD和schema变为DataFrame。然而为了取到DataFrame的每一行Row,需要调用DataFrame的foreach方法。

Spark的DataFrame的foreach的执行原理:

        Spark DataFrame 的 foreach() 方法将 DataFrame 的每一行作为 Row 对象进行循环,并将给定函数应用于该行。foreach() 的一些限制:Spark中的foreach()方法是在工作节点而不是Driver程序中调用的。这意味着,如果我们在函数内执行print()操作,将无法在会话或笔记本中看到打印结果,因为结果打印在工作节点中。行是只读的,因此您无法更新行的值。鉴于这些限制,foreach() 方法主要用于将每行的一些信息记录到本地计算机或外部数据库foreach方法无法改变原始的DataFrame数据,仅用于迭代处理每个分区的数据。

        foreach方法的处理是并行的,可以提高处理效率,但需要注意处理的顺序可能不同于原始数据的顺序。常规性能调优四:广播大变量默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复 本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的 时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作 线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变 量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。广播变量在每个 Executor 保存一个副本,此Executor 的所有 task 共用此广播变量,这让变 量产生的副本数量大大减少。在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变 量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如 果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的 复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的 BlockManager 中获取变量。

        一般spark.SparkContext是存在Driver进程里的,工作节点获取不到,每个jvm只能有一个SparkContext,再创建新的SparkContext之前需要先stop()当前活动的SparkContext。

综上分析,方案一不能实现,即DataFrame里不能创建DataFrame

2、方案二

        利用PrepareStatement执行插入更新的sql语句,将DataFrame里的每一行Row的字段数值解析出来封装成sql,每batch个执行一次并提交,碰到有重复的key执行update操作,新的数据执行insert操作。该方案的实现过程借鉴了spark的dataframe的save方法。tableSchema.fields.map(x => x.name).mkString(",")利用map和mkString方法进行字符串操作,同时利用spark的makeSetter方法实现PrepareStatement语句的填充。

三、碰到问题及解决

采用方案二在开发的过程碰到的问题:

  1. Spark error: value foreach is not a member of Object

当调用这样调用的时候:

升级2.12之后,DataFrameforeachPartition 里面不能处理 RowIterator

解决方法:(1

2)就是使用foreach替代foreachPartition

2、Caused by: java.io.NotSerializableException: org.postgreslq.jdbc.PgPrepareStatement

原因: prep是一个PrepareStatement对象,这个对象无法序列化,在标1的地方执行,而传入map中的对象是需要分布式传送到各个节点上,传送前先序列化,到达相应机器上后再反序列化,PrepareStatement是个Java类,如果一个java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口,对象prep在driver端,collect后的数据也在driver端,就不需prep序列化传到各个节点了。但这样其实会有collect的性能问题。

解决方案:使用mappartition在每一个分区内维持一个pgsql连接进行插入

3、在insert on conflict的语句上报Postgre SQL ERROR:there is no unique or exclusion constraint matching the ON CONFLICT specification。

        执行insert into test values('a','b') on conflict(a,b) do update set c='1';由于建表时没有建关于a,bCONSTRAINT,于是就会报错,为表添加CONSTRAINT

4、java jar 后面传参数,参数中含有空格的处理方法将含有空格的参数加上双引号。

四、总结

        该组件实现了对于数据规整规范的情况下,直接调用DataFrame的write.mode()方法批量写入,对于特殊的情况hive表里对于pg表的主键字段有重复的情况,进行了重新的封装,通过执行s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set name1=? ,name2=?"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/600203.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件工程:数据流图相关知识和多实例分析

目录 一、数据流图相关知识 1. 基本介绍 2. 常用符号 3. 附加符号 二、数据流图实例分析 1. 活期存取款业务处理系统 2. 工资计算系统 3. 商业自动化系统 4. 学校人事管理系统 5. 教材征订系统 6. 高考录取统分子系统 7. 订货系统 8. 培训中心管理系统 9. 考务处…

​已解决java.lang.ArrayIndexOutOfBoundsException异常的正确解决方法,亲测有效!!!​

已解决java.lang.ArrayIndexOutOfBoundsException异常的正确解决方法,亲测有效!!! 目录 报错问题 解决思路 解决方法 总结 Q1 - 报错问题 java.long.ArrayIndexOutOfBoundsException 是Java中的一个运行时异常​&#xff0c…

强化学习5——动态规划初探

动态规划具体指的是在某些复杂问题中,将问题转化为若干个子问题,并在求解每个子问题的过程中保存已经求解的结果,以便后续使用。实际上动态规划更像是一种通用的思路,而不是具体某个算法。 在强化学习中,被用于求解值函…

【网络工程师】交换机的配置

一、交换机5大基本工作模式 配置网络设备,需要使用console线,在PC上需要使用软件 “超级终端” 1、用户模式:switch> 可以查看交换机的额基本简单信息,且不能做任何修改配置! 2、特权模式:switch## …

在使用Composer管理的项目中安装和使用

在使用Composer管理的项目中安装 如果项目框架本身就是使用Composer来管理包的话,直接在项目根目录执行Composer安装命令后,即可在项目控制器中调用QueryList来使用,这种框架有: Laravel、ThinkPHP5等。 在项目根目录执行compo…

SQL日期列更新操作详解

在实际的数据库管理过程中,有时我们需要对数据库中的日期列进行更新。这篇博客将详细介绍一条 SQL 语句,该语句用于更新 referral_up_order 表中的多个日期列,并将它们的日期部分更改为 2023-10-24,同时保留原始时间部分。 1、背…

LeGO-LOAM 几个特有函数的分析(2)

接上回LeGO-LOAM 几个特有函数的分析(1) 二、广度优先遍历 广度优先遍历(Breadth-First Search, BFS)是一种用于遍历或搜索树或图的算法。这种算法从树的根(或图的某一指定节点)开始,然后探索…

Linux 常见服务配置

笔记所以内容很多,建议选择性看看 SSH 对应服务 sshd 注意:配置文件 配制文件修改需要重启或重载sshd服务才能生效 systemctl sshd reload # 重载 sshd 配置文件 systemctl sshd restart # 重启 sshd 服务客户端配置文件 man ssh_config 可以…

数据库高可用mha

MHA搭建的步骤 一.配置主从复制 1.初始化环境 #在四台服务器上初始化环境 systemctl stop firewalld systemctl disable firewalld setenforce 0 2.修改 Master、Slave1、Slave2 节点的主机名 #在Master上 hostnamectl set-hostname mysql1 su#在Slave1 hostnamectl set-h…

泛型-限定存储数据类型

泛型 泛型的本质&#xff1a;参数类型化 概述&#xff1a;将类型由原来的具体的类型参数化&#xff0c;然后在 使用/调用 时传入具体的类型 格式&#xff1a; <类型> 指定一种类型的格式&#xff0c;这里的类型可以看成是 方法中的形参&#xff08;如果不理解可去看下形…

Flink窗口与WaterMark

本文目录 窗口的生命周期Window Assigners窗口函数&#xff08;Window Functions&#xff09;TriggersEvictorsAllowed Lateness 窗口 窗口&#xff08;Window&#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中&#xff0c;再对每个“桶”加以处理。…

域名流量被劫持怎么办?如何避免域名流量劫持?

随着互联网不断发展&#xff0c;流量成为线上世界的巨大财富。然而一种叫做域名流量劫持的网络攻击&#xff0c;将会在不经授权的情况下控制或重定向一个域名的DNS记录&#xff0c;导致用户在访问一个网站时&#xff0c;被引导到另一个不相关的网站&#xff0c;从而劫持走原网站…

企微私域流量引流:从策略到实践的全面解析

随着互联网的发展&#xff0c;流量变得越来越贵&#xff0c;如何将流量转化为企业的价值成为了重要的议题。在这个背景下&#xff0c;企微私域流量成为了企业关注的焦点。本文将从策略和实践两个角度&#xff0c;全面解析企微私域流量的引流方法。 一、策略篇 1. 明确目标&…

maven pom.xml 加载本地.jar库文件方法

一般我们使用的jar包,都是从maven仓库中加载的, 那如果是从本地该如何加载呢? 本文介绍maven加载本地jar的方法 在 pom.xml 的 dependencies 节点内增加以下配置即可 <dependency> <groupId>cn.tekin</groupId> <artifactId>mylib</artifactI…

leetcode:908. 最小差值 I

一、题目 二、函数原型 int smallestRangeI(int* nums, int numsSize, int k) 三、思路 本题题目有些绕口&#xff0c;但是无伤大雅。本质就是可以对数组中的每个元素进行加/减 k 的操作&#xff0c;然后求数组中的最大、最小元素的最小差值。 分为几种情况&#xff1a; …

怎么查询网络出口IP

怎么查询自己的网络的出口IP 背景 一般跟第三方服务进行接口数据交互的时候&#xff0c;对方都会让我们提供调用接口的网络的出口IP&#xff0c;对方会把该IP地址加到对方的白名单中。这样我们才能有权限进行接口的访问。 解决办法 下面介绍三种常用的查询网络出口IP的办法…

C语言编译器(C语言编程软件)完全攻略(第二十九部分:Linux GCC简明教程(使用GCC编写C语言程序))

介绍常用C语言编译器的安装、配置和使用。 二十九、Linux GCC简明教程&#xff08;使用GCC编写C语言程序&#xff09; 市面上常见的 Linux 都是发行版本&#xff0c;典型的 Linux 发行版包含了 Linux 内核、桌面环境&#xff08;例如 GNOME、KDE、Unity 等&#xff09;和各种…

[Verilog语言入门教程] 乘法器详解 与 设计/仿真

依公知及经验整理,原创保护,禁止转载。 专栏 《Verilog》 <<<< 返回总目录 <<<< 乘法器可以分为以下5种类型: 顺序乘法器(Sequential Multiplier):顺序乘法器是最简单的乘法器类型,采用逐位相乘的方法实现。这种乘法器适用于小规模的乘法运算…

go构建项目与打包

环境搭建 使用的组件及版本 operator-sdk v1.22.0go 1.20.0 linux/amd64git 1.8.3.1k8s 1.18.5docker 20.10.5 前期配置 安装Git yum install git安装docker yum install docker-ce安装go 官网下载 tar -C /usr/local/ -xvf go1.20.linux-amd64.tar.gz 环境配置 // 将go配置…

【赠书第16期】码上行动:用ChatGPT学会Python编程

文章目录 前言 1 ChatGPT简介 2 Python编程简介 3 使用ChatGPT学习Python编程 4 如何使用ChatGPT学习Python编程 5 推荐图书 6 粉丝福利 前言 随着人工智能技术的不断发展&#xff0c;聊天机器人已经成为我们日常生活和工作中不可或缺的一部分。其中&#xff0c;ChatGP…