加强网站互动交流平台建设自查app开发难吗
news/
2025/10/3 8:59:34/
文章来源:
加强网站互动交流平台建设自查,app开发难吗,网站一次性建设,湖州网站网站建设PySpark的编程#xff0c;主要氛围三大步骤#xff1a;1#xff09;数据输入、2#xff09;数据处理计算、3#xff09;数据输出 1#xff09;数据输入:通过SparkContext对象#xff0c;晚上数据输入 2#xff09;数据处理计算:输入数据后得到RDD对象#xff0c;对RDD…PySpark的编程主要氛围三大步骤1数据输入、2数据处理计算、3数据输出 1数据输入:通过SparkContext对象晚上数据输入 2数据处理计算:输入数据后得到RDD对象对RDD对象进行迭代计算 3数据输出:最终通过RDD对象的成员方法完成数据输出工作
安装pyspark pip install pyspark pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark # 清华大学镜像 想要使用PySpark库完成数据处理首先需要构建一个执行环境入口对象 PySpark的执行环境入口对象是:类SparkContext的类对象 构建PySpark执行环境入口对象
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark_app) # 链式调用的写法# 基于SparkConf类对象创建parkContext类对象
sc SparkContext(confconf)# 打印PySpark的允许版本
print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
RDD: 弹性分布式数据集
1. python数据容器 转RDD对象 通过SpaarkContext对象的parallelize成员方法,将python数据容器转换为PySpark的RDD对象 2. 读取文件数据 转RDD对象 通过SpaarkContext入口对象textFile()方法来读取文件来构建出RDD对象
通过PySpark代码加载数据即数据输入:
from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 通过parallelize方法将python数据容器加载到spark内成为RDD对象
rdd1 sc.parallelize([1, 2, 3, 4, 5])
rdd2 sc.parallelize((1, 2, 3, 4, 5))
rdd3 sc.parallelize(abcdefg)
rdd4 sc.parallelize({1, 2, 3, 4, 5})
rdd5 sc.parallelize({key1: value1, key2: value})# 如果要查看RDD里面有什么内容需要用collect()方法
print(rdd1.collect()) # [1, 2, 3, 4, 5]
print(rdd2.collect()) # [1, 2, 3, 4, 5]
print(rdd3.collect()) # [a, b, c, d, e, f, g]
print(rdd4.collect()) # [1, 2, 3, 4, 5]
print(rdd5.collect()) # [key1, key2]# 通过textFile方法读取文件数据加载到spark内成为RDD对象
rdd6 sc.textFile(./test.txt)
print(rdd6.collect()) # [123456, 123456, 123456]sc.stop()数据计算
PySpark的数据计算,都是基于RDD对象来进行的那么如何进行呢 依赖RDD对象内置丰富的 成员方法算子
map方法:
对RDD内的元素逐个处理并返回一个新的RDD接受一个处理函数可用lambda匿名函数快速编写
from pyspark import SparkConf, SparkContext# 添加python解释器路径
# import os
# os.environ[PYSPARK_PYTHON] python.exe # python解释器路径# 创建SparkConf类对象
conf SparkConf().setMaster(local[*]).setAppName(test_spark)# 基于SparkConf类对象创建sparkContext类对象
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘以10
# def func(data):
# return data * 10# rdd2 rdd.map(func) # (T) - U : 表示func函数必须有一个参数和一个返回值
# print(rdd2.collect()) # [10, 20, 30, 40, 50]# 匿名函数
# rdd2 rdd.map(lambda x: x * 10)
# print(rdd2.collect())# 链式调用
rdd3 rdd.map(lambda x: x * 10).map(lambda x: x 5)
print(rdd3.collect()) # [15, 25, 35, 45, 55]sc.stop()
flatmap算子:
对rdd执行map操作,然后进行解除嵌套操作
from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)# 准备一个RDD
rdd sc.parallelize([itheima itcast 666, itheima itheima itcast, python itheima])# 需求将RDD数据里面的一个个单词提取出来
rdd2 rdd.flatMap(lambda x: x.split( ))
print(rdd2.collect()) # [itheima, itcast, 666, itheima, itheima, itcast, python, itheima]
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/925679.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!