PysparkNote006---rdd的flatMap

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.types import Row# 创建SparkSession对象,调用.builder类
# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSession
spark = SparkSession.builder.appName("pysaprk").getOrCreate()
from pyspark.sql import functions as F
import pandas as pd
df_pd = pd.DataFrame({'id': ['A', 'B'], 'index': ['1', '2'], 'index1': ['2', '4']})
df_pd
idindexindex1
0A12
1B24
df = spark.createDataFrame(df_pd)
df.show()
+---+-----+------+
| id|index|index1|
+---+-----+------+
|  A|    1|     2|
|  B|    2|     4|
+---+-----+------+
df.printSchema()
root|-- id: string (nullable = true)|-- index: string (nullable = true)|-- index1: string (nullable = true)

map

def ff(row):# print([row['id'], row.index, row.index1])print([row['id'], row['index'], row['index1']])return [[row['id'], row['index'], row['index1']]]
map_df = spark.createDataFrame(df.rdd.map(lambda x: ff(x)))
map_df.show()
+---------+
|       _1|
+---------+
|[A, 1, 2]|
|[B, 2, 4]|
+---------+
map_df.printSchema()
root|-- _1: array (nullable = true)|    |-- element: string (containsNull = true)

map()是将函数用于RDD中的每个元素,将返回值构成新的RDD
对于map来说,dataframe的一行是元素,
flatmap()是将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD

df.rdd.map(lambda x: ff(x)).toDF().show()
+---------+
|       _1|
+---------+
|[A, 1, 2]|
|[B, 2, 4]|
+---------+

flatMap

schema = StructType([StructField(cn, StringType()) for cn in['id', 'index', 'index1']])
flatMap_df = spark.createDataFrame(df.rdd.flatMap(lambda x: ff(x)))
flatMap_df.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  A|  1|  2|
|  B|  2|  4|
+---+---+---+
flatMap_df.printSchema()
root|-- _1: string (nullable = true)|-- _2: long (nullable = true)|-- _3: long (nullable = true)
df.rdd.flatMap(lambda x: ff(x)).toDF().show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  A|  1|  2|
|  B|  2|  4|
+---+---+---+

写的比较抽象,可以看下面第二个blog,有图讲的清晰些
在有些场景下,比如对df做操作,需要保留某些列,并且做些列之间复杂计算时,需要把最终结果再还原成dataframe时,比较好用

Ref

[1] https://blog.csdn.net/ten_sory/article/details/80897648
[2] https://cloud.tencent.com/developer/article/1912787

                                2023-07-26 阴 于南京市江宁区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/8497.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

APISIX 安全评估

背景 有大佬已经对 [apisix攻击面](https://ricterz.me/posts/2021-07-05-apache-apisix-attack- surface-research.txt)做过总结。 本文记录一下自己之前的评估过程。 分析过程 评估哪些模块? 首先我需要知道要评估啥,就像搞渗透时,我得…

Springboot项目打包war配置详解

Springboot项目打包war配置详解 1. 排除内置tomcat依赖2. 添加servlet依赖3. 修改打包方式4. 修改主启动类5. 完整pom.xml6. 效果图 1. 排除内置tomcat依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter…

xmind latex【记录备忘】

xmind latex 换行 换行必须要有\begin{align}和\end{align}&#xff0c;此时再在里面用\才能换行&#xff0c;如果只写112\224是不能换行的

[Ubuntu 22.04] 安装docker,并设置镜像加速

for pkg in docker.io docker-doc docker-compose podman-docker containerd runc; do sudo apt-get remove $pkg; doneapt install -y curl vim wget gnupg dpkg apt-transport-https lsb-release ca-certificates# 添加Docker的GPG公钥和apt源 #curl -sSL https://download.d…

pyenv教程--管理python版本

pyenv&#xff1a;简单的Python版本管理器&#xff0c;可以实现各个Python版本之间的自由切换切换。 具体讲解见如下链接。 pyenv使用讲解

RabbitMQ优先级队列的使用

RabbitMQ优先级队列的使用 生产者 public class PriorityQueue {public static void Send(){string path AppDomain.CurrentDomain.BaseDirectory;string tag path.Split(/, \\).Last(s > !string.IsNullOrEmpty(s));Console.WriteLine($"这里是 {tag} 启动了。。&…

Node2Vec spark版本采样生成序列

Node2Vec spark版本采样生成序列 前言 最近对node2vec比较感兴趣&#xff0c;再有源码的加持&#xff0c;想在生产环境复现一把&#xff0c;在复现过程中&#xff0c;发现几处bug&#xff08;有向图的生成&#xff0c;边的起点和终点的拼接符号&#xff09;&#xff0c;本文予…

理论计算初学者实用软件,PWmat Windows版本

PWmat 是一款功能强大的平面波密度泛函软件&#xff0c;拥有上千用户&#xff0c;已被广泛用于材料研发之中。但对于大多数学生来说&#xff0c;由于经费的限制&#xff0c;他们无缘于PWmat 应用。为了满足广大同学要求&#xff0c;龙讯旷腾现隆重推出免费版的PWmat Microsoft …

七、Kafka源码分析之网络通信

1、生产者网络设计 架构设计图 2、生产者消息缓存机制 1、RecordAccumulator 将消息缓存到RecordAccumulator收集器中, 最后判断是否要发送。这个加入消息收集器&#xff0c;首先得从 Deque 里找到自己的目标分区&#xff0c;如果没有就新建一个批量消息 Deque 加进入 2、消…

【Windows】不要让你的win键落灰!掌握常用的组合快捷键,使用电脑更高效了

Windows 操作系统提供了丰富的键盘快捷键&#xff0c;能够大幅提高工作效率和操作便利性。在此介绍一些与 Win 键相关的常用快捷键&#xff0c;帮助你更好地利用 Windows 系统。想要在使用电脑时更高效吗&#xff1f;掌握常用的组合快捷键&#xff0c;让你的 Win 键从此不再落灰…

【Vue】水印组件

前言&#xff1a; 最近在工作中接收到了一个给页面添加水印的需求&#xff0c;在网上看到了各种各样的写法&#xff0c;但是感觉写的都比较啰嗦或者复杂&#xff0c;就想着自己写个组件&#xff0c;可以在以后得工作中经常用到&#xff0c;目前是使用Vue技术写的&#xff0c;如…

sql中on条件和where条件查询结果一样嘛?

如果使用 join不会有影响。 但是 在使用left join时&#xff0c;on和where条件的区别如下&#xff1a; on条件是在生成临时表时使用的条件&#xff0c;它不管on中的条件是否为真&#xff0c;都会返回左边表中的记录。 where条件是在临时表生成好后&#xff0c;再对临时表进行…

Java 两台服务器间使用FTP进行文件传输

背景&#xff1a;需要把服务器A中的文件拷贝至服务器B中&#xff0c;要求使用FTP进行传输&#xff0c;当文件传输未完成时文件是tmp格式的&#xff0c;传输完毕后显示为原格式&#xff08;此处是grib2&#xff09;。 package org.example;import org.apache.commons.io.FileUt…

Security+备考我想分想这几点

考试初衷 本人是一名信息安全从业者&#xff0c;听过很多信息安全方面的认证&#xff0c;如CISP、CISSP、CISA&#xff0c;但是没听过Security认证&#xff0c;偶然的机会&#xff0c;我的同事给我介绍了谷安&#xff0c;从这里我才了解到还有Security认证这么一个信息安全认证…

微服务——http客户端Feign

目录 Restemplate方式调用存在的问题 Feign的介绍 基于Feign远程调用 Feign自定义配置 修改日志方式一(基于配置文件) 修改日志方式二(基于java代码) Feign的性能优化 连接池使用方法 Feign_最佳实践分析 方式一: 方式二 实现Feign最佳实践(方式二) 两种解决方案 Re…

PostgreSql 事务

一、事务的 ACID 特性 在日常操作中&#xff0c;对于一组相关操作&#xff0c;通常需要其全部成功或全部失败。在关系型数据库中&#xff0c;将这组相关操作称为事务。事务具有的四个特性简称为 ACID。 原子性&#xff08;Atomicity&#xff09;&#xff1a;保证事务中的操作要…

通过v-for生成的input无法连续输入

部分代码&#xff1a;通过v-for循环生成el-form-item&#xff0c;生成多个描述输入框 更改之前的代码&#xff08;key绑定的是item&#xff09;&#xff1a; <el-form-item class"forminput" v-for"(item,index) in formdata.description" :key"…

centos下安装jdk

环境:centos7/openjdk-8u40-b25 openJDK页面 java二进制包下载页面 华为jdk镜像 1.下载安装包后上传到服务器上&#xff0c;运行命令解压到/opt/目录下 tar cxvf server-jre-8u271-linux-x64.tar.gz -C /opt/2.配置环境变量 vi /etc/profile source /etc/profile添加下面的…

IFNULL()COALESCE()

在 MySQL 中&#xff0c;IFNULL() 函数是可用的&#xff0c;但是请注意它不能直接用于聚合函数的结果。要在聚合函数结果可能为 NULL 的情况下返回特定值&#xff0c;应该使用 COALESCE() 函数而不是 IFNULL() 函数。 以下是代码示例&#xff1a; COALESCE(SUM(pc.CONTRACT_T…

【C语言】文件操作

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前正在回炉重造C语言&#xff08;2023暑假&#xff09; ✈️专栏&#xff1a;【C语言航路】 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你…