spark-RDD期中 - 指南

news/2025/11/29 11:25:02/文章来源:https://www.cnblogs.com/tlnshuju/p/19285626

spark-RDD期中 - 指南

2025-11-29 11:22  tlnshuju  阅读(0)  评论(0)    收藏  举报

一 sc对象创建处理

from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local[*]").setAppName("My APP")
sc=SparkContext(conf=conf)

二 RDD对象创建

rdd1=sc.textFile("*.csv")
rdd1.disctinct().map(lambda x:x.split(",")).filter(lambda x:x[0]!="id")

三 转化算子

(一):value算子

from pyspark import SparkContext
sc = SparkContext("local", "Example")
# 1. filter
data = sc.parallelize([1, 2, 3, 4, 5])
filtered_data = data.filter(lambda x: x % 2 == 0)
filtered_data.collect()  # 输出: [2, 4]
# 2. map
mapped_data = data.map(lambda x: x * 2)
mapped_data.collect()  # 输出: [2, 4, 6, 8, 10]
# 3. flatMap
data = sc.parallelize(["hello world", "hi"])
flattened_data = data.flatMap(lambda x: x.split(" "))
flattened_data.collect()  # 输出: ['hello', 'world', 'hi']
# 4. sortBy
data = sc.parallelize([1, 2, 3, 4, 5])
sorted_data = data.sortBy(lambda x: x, ascending=False)
sorted_data.collect()  # 输出: [5, 4, 3, 2, 1]
# 5. groupBy
data = sc.parallelize([1, 2, 3, 4, 5])
grouped_data = data.groupBy(lambda x: x % 2)
grouped_data.collect()  # 输出: [(0, ), (1, )]
# 6. distinct
data = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
distinct_data = data.distinct()
distinct_data.collect()  # 输出: [1, 2, 3, 4, 5]
# 7. union
data1 = sc.parallelize([1, 2, 3])
data2 = sc.parallelize([4, 5, 6])
union_data = data1.union(data2)
union_data.collect()  # 输出: [1, 2, 3, 4, 5, 6]
# 8. intersection
data1 = sc.parallelize([1, 2, 3, 4])
data2 = sc.parallelize([3, 4, 5, 6])
intersection_data = data1.intersection(data2)
intersection_data.collect()  # 输出: [3, 4]
# 9. subtract
data1 = sc.parallelize([1, 2, 3, 4])
data2 = sc.parallelize([3, 4, 5, 6])
subtract_data = data1.subtract(data2)
subtract_data.collect()  # 输出: [1, 2]
# 10. zip
data1 = sc.parallelize([1, 2, 3])
data2 = sc.parallelize(['a', 'b', 'c'])
zipped_data = data1.zip(data2)
zipped_data.collect()  # 输出: [(1, 'a'), (2, 'b'), (3, 'c')]
# 11. glom
data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
glom_data = data.glom()
glom_data.collect()  # 输出: [[1, 2, 3], [4, 5]]
# 12. repartition
data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
repartitioned_data = data.repartition(3)  # 重新分区为 3 个分区
repartitioned_data.getNumPartitions()  # 输出: 3
# 13. mapPartitions
def process_partition(iterator):
    return [x * 2 for x in iterator]
data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
mapped_partitions_data = data.mapPartitions(process_partition)
mapped_partitions_data.collect()  # 输出: [2, 4, 6, 8, 10]
  • filter:过滤操作,用于从原始数据集中筛选出满足特定条件的元素,返回一个新的数据集,只包含符合条件的元素。

  • map:映射操作,对数据集中的每个元素应用一个指定的函数,将每个元素转换为新的值,返回一个新的数据集。

  • flatMap:扁平化映射操作,对数据集中的每个元素应用一个函数,该函数返回一个可迭代对象(如列表),然后将这些可迭代对象中的所有元素合并成一个扁平化的数据集。

  • sortBy:排序操作,根据指定的函数对数据集中的元素进行排序,可以指定升序或降序。

  • groupBy:分组操作,根据指定的函数对数据集中的元素进行分组,将具有相同键的元素归为一组,返回一个键值对的数据集,其中键是分组的依据,值是属于该组的元素集合。

  • distinct:去重操作,返回一个新的数据集,其中包含原始数据集中的唯一元素,去除重复的元素。

  • union:并集操作,将两个数据集合并为一个新的数据集,包含两个数据集中的所有元素。

  • intersection:交集操作,返回两个数据集的交集,即同时存在于两个数据集中的元素。

  • subtract:差集操作,返回第一个数据集中不在第二个数据集中的元素。

  • zip:配对操作,将两个数据集的元素按顺序配对,返回一个新的数据集,其中每个元素是一个元组,包含来自两个数据集的对应元素。

  • glom:分区聚合操作,将每个分区的数据转换为一个列表,返回一个新的数据集,其中每个元素是一个列表,表示一个分区的数据。

  • repartition:重新分区操作,将数据集重新分区为指定数量的分区,可以用于调整数据的分布和并行度。

  • mapPartitions:分区映射操作,对每个分区的数据应用一个函数,返回一个新的数据集,其中每个分区的数据是经过函数处理后的结果。

(二)Key_Value算子

from pyspark import SparkContext
sc = SparkContext("local", "KeyValueExample")
# 创建一个 Key-Value 类型的 RDD
data = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)])
# 1. partitionBy
partitioned_data = data.partitionBy(2)  # 根据分区器分区
partitioned_data.getNumPartitions()  # 输出: 2
# 2. reduceByKey
reduced_data = data.reduceByKey(lambda a, b: a + b)
reduced_data.collect()  # 输出: [(1, 2), (3, 18), (4, 5), (5, 6)]
# 3. keys
keys_data = data.keys()
keys_data.collect()  # 输出: [1, 3, 3, 4, 5, 3]
# 4. values
values_data = data.values()
values_data.collect()  # 输出: [2, 4, 6, 5, 6, 8]
# 5. sortByKey
sorted_data = data.sortByKey()
sorted_data.collect()  # 输出: [(1, 2), (3, 4), (3, 6), (3, 8), (4, 5), (5, 6)]
# 6. groupByKey
grouped_data = data.groupByKey()
grouped_data.collect()  # 输出: [(1, ), (3, ), (4, ), (5, )]
# 转换为更易读的格式
grouped_data.mapValues(list).collect()  # 输出: [(1, [2]), (3, [4, 6, 8]), (4, [5]), (5, [6])]
# 7. mapValues
mapped_values_data = data.mapValues(lambda x: x * 2)
mapped_values_data.collect()  # 输出: [(1, 4), (3, 8), (3, 12), (4, 10), (5, 12), (3, 16)]
# 8. flatMapValues
flattened_values_data = data.flatMapValues(lambda x: [x, x * 2])
flattened_values_data.collect()  # 输出: [(1, 2), (1, 4), (3, 4), (3, 8), (3, 6), (3, 12), (4, 5), (4, 10), (5, 6), (5, 12), (3, 8), (3, 16)]
# 9. join
data1 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
data2 = sc.parallelize([(3, 8), (4, 5)])
joined_data = data1.join(data2)
joined_data.collect()  # 输出: [(3, (4, 8)), (3, (6, 8))]
# 10. leftOuterJoin
left_joined_data = data1.leftOuterJoin(data2)
left_joined_data.collect()  # 输出: [(1, (2, None)), (3, (4, 8)), (3, (6, 8))]
# 11. rightOuterJoin
right_joined_data = data1.rightOuterJoin(data2)
right_joined_data.collect()  # 输出: [(3, (4, 8)), (4, (None, 5)), (3, (6, 8))]
  • partitionBy:分区操作,根据指定的分区器对 Key-Value 类型的数据集进行分区,将具有相同键的元素分配到同一个分区中,通常用于优化后续的聚合操作。

  • reduceByKey:聚合操作,对 Key-Value 类型的数据集中的每个键对应的值进行聚合,使用指定的函数将同一个键的所有值合并为一个值,返回一个新的 Key-Value 数据集。

  • keys:提取键操作,从 Key-Value 类型的数据集中提取所有的键,返回一个新的数据集,只包含键。

  • values:提取值操作,从 Key-Value 类型的数据集中提取所有的值,返回一个新的数据集,只包含值。

  • sortByKey:按键排序操作,根据 Key-Value 类型数据集中的键对数据进行排序,可以指定升序或降序。

  • groupByKey:按键分组操作,将 Key-Value 类型的数据集中的所有值按照键进行分组,返回一个新的 Key-Value 数据集,其中每个键对应一个值的集合。

  • mapValues:映射值操作,对 Key-Value 类型的数据集中的每个值应用一个函数,返回一个新的 Key-Value 数据集,键保持不变,值被转换为新的值。

  • flatMapValues:扁平化映射值操作,对 Key-Value 类型的数据集中的每个值应用一个函数,该函数返回一个可迭代对象(如列表),然后将这些可迭代对象中的所有元素合并成一个扁平化的 Key-Value 数据集。

  • join:连接操作,对两个 Key-Value 类型的数据集进行内连接,返回一个新的 Key-Value 数据集,其中每个键对应的值是两个数据集中该键对应的值的组合。

  • leftOuterJoin:左外连接操作,对两个 Key-Value 类型的数据集进行左外连接,返回一个新的 Key-Value 数据集,其中第一个数据集中的每个键都包含在结果中,如果第二个数据集中没有对应的键,则值为 None

  • rightOuterJoin:右外连接操作,对两个 Key-Value 类型的数据集进行右外连接,返回一个新的 Key-Value 数据集,其中第二个数据集中的每个键都包含在结果中,如果第一个数据集中没有对应的键,则值为 None

四 行动算子

from pyspark import SparkContext
sc = SparkContext("local", "ActionExample")
# 创建一个 RDD
data = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)])
# 1. collect
collected_data = data.collect()
print(collected_data)  # 输出: [(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)]
# 2. collectAsMap
collected_map = data.collectAsMap()
print(collected_map)  # 输出: {1: 2, 3: 8, 4: 5, 5: 6}
# 3. reduce
reduced_data = data.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
print(reduced_data)  # 输出: (16, 31)
# 4. take
taken_data = data.take(3)
print(taken_data)  # 输出: [(1, 2), (3, 4), (3, 6)]
# 5. top
top_data = data.top(3, key=lambda x: x[1])
print(top_data)  # 输出: [(3, 8), (5, 6), (4, 5)]
# 6. first
first_element = data.first()
print(first_element)  # 输出: (1, 2)
# 7. count
count_data = data.count()
print(count_data)  # 输出: 6
# 8. countByKey
count_by_key = data.countByKey()
print(count_by_key)  # 输出: defaultdict(, {1: 1, 3: 3, 4: 1, 5: 1})
# 9. saveAsTextFile
data.saveAsTextFile("output")
# 10. aggregate
# 定义初始值
zero_value = (0, 0)
# 分区内聚合函数
seq_op = lambda acc, value: (acc[0] + value[0], acc[1] + value[1])
# 分区间聚合函数
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
aggregated_data = data.aggregate(zero_value, seq_op, comb_op)
print(aggregated_data)  # 输出: (16, 31)
  • collect:将 RDD 中的所有元素收集到驱动程序中,返回一个列表。

  • collectAsMap:将 Key-Value 类型的 RDD 收集到驱动程序中,返回一个字典。

  • reduce:对 RDD 中的所有元素应用一个函数,将所有元素合并为一个值。

  • take:返回 RDD 中的前 n 个元素。

  • top:返回 RDD 中的前 n 个元素,根据指定的排序函数。

  • first:返回 RDD 中的第一个元素。

  • count:返回 RDD 中的元素数量。

  • countByKey:对 Key-Value 类型的 RDD,返回每个键对应的元素数量。

  • saveAsTextFile:将 RDD 保存为文本文件。

  • aggregate:对 RDD 中的元素进行聚合操作,可以指定初始值、分区内的聚合函数和分区间的聚合函数。

我特喜欢hjp(超小声)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/980586.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年十大非标定制不锈钢水管公司排行榜,304不锈钢水管

在不锈钢水管行业快速发展的当下,工程公司对优质管材的需求愈发精准——既需304/316L等合规材质保障水质安全,又依赖环压式等可靠连接技术避免施工隐患,更离不开非标定制能力适配复杂场景。为帮企业避开材质以次充好…

2025年优质的应届生云计算就业就业首选榜

2025年优质的应届生云计算就业榜开篇:云计算行业背景与市场趋势近年来,随着数字化转型的加速推进,云计算已成为全球信息技术产业的核心驱动力。根据国际数据公司(IDC)报告显示,2024年全球公有云服务市场规模已突破…

2025年床垫品牌加盟服务推荐:靠谱品牌助你少走弯路!

想入局床垫行业?选对品牌加盟服务,等于成功了一半!本文结合市场调研与真实经销商反馈,精选3家在加盟支持、产品实力、盈利保障上表现突出的床垫品牌,为你提供专业的床垫品牌加盟建议。 推荐1:美萨家具 推荐指数:…

2025年评价高的广东阿里巴巴运营/阿里巴巴运营用户好评榜

2025年评价高的广东阿里巴巴运营/阿里巴巴运营用户好评榜开篇:行业背景与市场趋势随着数字经济时代的全面到来,电子商务已成为企业拓展市场、提升品牌影响力的重要渠道。作为全球的B2B平台,阿里巴巴国际站和1688平台…

2025年评价高的茶饮喝茶/茶饮喝茶茶叶礼盒最新TOP推荐

2025年评价高的茶饮/茶叶礼盒TOP推荐 行业背景与市场趋势 近年来,随着健康生活理念的普及和消费升级,茶饮市场持续扩容。2025年,中国茶饮行业规模预计突破5000亿元,其中新式茶饮和精品茶叶礼盒成为增长主力。年…

2025年一字形通风气楼定制厂家权威推荐榜单:排烟通风气楼/三角形通风气楼‌/18j621型通风气楼‌源头厂家精选

在工业建筑领域,一字形通风气楼正以精确的气流组织和高效的排烟性能,成为现代厂房通风系统的核心要素。 在工业建筑绿色化、智能化发展的背景下,一字形通风气楼凭借其结构简洁、通风效率高、适配性强的特点,在厂房…

2025年质量好的酱卤制品食品添加剂/奥尔良食品添加剂最新TOP排名厂家

2025年质量好的酱卤制品食品添加剂/奥尔良食品添加剂TOP排名厂家行业背景与市场趋势随着中国食品工业的快速发展,酱卤制品和奥尔良风味食品在餐饮和零售市场占据了重要地位。据中国食品工业协会数据显示,2024年我国酱…

2024年真空袋厂家联系电话完整汇总:全国重点企业联系方式及采购效率提升指南

本文基于2024年行业公开数据和权威第三方报告,结合推荐对象参考内容,从专业能力、服务稳定性、资源网络等维度筛选5个推荐对象,旨在为真空袋厂家联系电话领域提供可靠联系方案。根据中国包装联合会数据显示,2023年…

2025年11月能碳管理平台推荐榜单与选择指南

随着全球气候变化议题日益受到重视,企业对于碳排放管理的需求逐渐从被动合规转向主动治理。能碳管理平台作为实现双碳目标的重要工具,其选择成为许多企业管理者关注的焦点。根据国家相关部门发布的行业标准文件及第三…

成都高端网站建设公司推荐

一、按企业类型精准选服务商,不花冤枉钱中大型企业 / 集团上市公司 核心需求:适配长期数字化战略,打造高端品牌形象 + 打通增长链路选型标准:优先选综合实力型服务商,重点看 3 点 —— 跨行业高端定制经验、全链路…

2025年广东14岁孩子很难管教学校权威推荐榜单:孩子不爱学习很难管教/孩子三年级后很难管教‌/叛逆孩子很难管教‌学校精选

在广东,面对14岁孩子不爱学习、难以管教的普遍困境,专业教育机构通过科学干预方法正帮助数千家庭重建亲子关系。 14岁青少年正处于心理发展的"暴风雨期",据广东省青少年心理健康普查数据显示,这一年龄段…

2025年知名的南宁公司注册热门权威榜

2025年知名的南宁公司注册热门权威榜开篇:南宁公司注册行业背景与市场趋势随着中国-东盟自由贸易区的深入发展和"一带一路"倡议的持续推进,南宁作为广西壮族自治区的首府和面向东盟的国际大通道枢纽城市,…

【相邻数对】暴力遍历

暴力,没什么好说的: #include<iostream> #include<vector> using namespace std; int n;int main(){cin >> n;vector<int> num(n+3);for(int i = 0; i < n; i++){cin >> num[i];}i…

完整教程:【JUnit实战3_22】 第十三章:用 JUnit 5 做持续集成(下):Jenkins + JUnit 5 + Git 持续集成本地实战演练完整复盘

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年超高压电力塔实力厂家权威推荐榜单:光伏电力塔/风电电力塔/分支电力塔源头厂家精选

在新型电力系统加速构建的背景下,超高压电网建设迎来新一轮发展高峰。数据显示,2025年中国超高压电网投资规模预计突破1800亿元,其中输电铁塔需求占比达35%,角钢塔、钢管塔、特高压交流塔成为市场主流产品。为帮助…

2025年专业的短视频运营最新方案推荐榜

2025年专业的短视频运营方案推荐榜开篇:短视频运营行业背景与市场趋势随着5G技术的全面普及和移动互联网的深入发展,短视频行业在2025年迎来了新一轮的增长高峰。据数据显示,全球短视频用户规模已突破35亿,中国短视…

2025年哈尔滨24小时寄宿监护机构权威推荐榜单:家校共育/升学规划指导‌/私立高中‌源头机构精选

在冰城哈尔滨,随着教育需求的多元化发展,24小时寄宿监护机构正成为越来越多家庭的选择。 随着教育理念的升级和家长对子女教育投入的加大,哈尔滨的教育服务市场近年来呈现出蓬勃发展的态势。 据市场数据显示,2024年…

2025年中国不锈钢水管十大品牌推荐:秦西盟不锈钢水管适合哪

本榜单依托全维度市场调研与真实行业口碑,深度筛选出十家标杆企业,为工程公司、家装用户及市政项目选型提供客观依据,助力精准匹配适配的不锈钢水管供应商。 TOP1 推荐:陕西秦西盟实业有限公司(秦西盟) 推荐指数…

敏感肌也能安心美白!极光甘草千人实测达成95.5%满意度,2025美白产品必选谷雨

联合全国8家三甲医院皮肤科、历时6个月追踪研究,《2025中国人美白护肤趋势报告》显示:93.6%的消费者因肤质不匹配而美白无效,而精准护肤的成功率提升3.2倍。 明明用心护肤,却总不见效果?《2025中国人美白护肤趋势…

11.20 禁用安全连接方式(SSL) 获取执行SQL的对象

在端口后加?useSSL=false connection conn=DriverManager.getconnection(url,username,password); 普通的执行SQL对象 Statement createStatement() 预编译sql的执行sql对象:防止sql注入 PreparedStatement(sql) 执…