Spark-Streaming

找出所有有效数据,要求电话号码为11位,但只要列中没有空值就算有效数据。
按地址分类,输出条数最多的前20个地址及其数据。
代码讲解:

导包和声明对象,设置Spark配置对象和SparkContext对象。
使用Spark SQL语言进行数据处理,包括创建数据库、数据表,导入数据文件,进行数据转换。
筛选有效数据并存储到新表中。
按地址分组并统计出现次数,排序并输出前20个地址。

代码如下

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject Demo {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")val spark = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate()spark.sql(sqlText = "create database spark_sql_2")spark.sql(sqlText = "use spark_sql_2")//创建存放原始数据的表spark.sql("""|create table user_login_info(data string|row format delimited|""".stripMargin)spark.sql(sqlText = "load data local inpath 'Spark-SQL/input/user_login_info.json' into table user_login_info")//利用get_json_object将数据做转换spark.sql("""|create table user_login_info_1|as|select get_json_object(data,'$.uid') as uid,|get_json_object(data,'$.phone') as phone,|get_json_object(data,'$.addr') as addr from user_login_info|""".stripMargin)spark.sql(sqlText = "select count(*) count from user_login_info_1").show()//获取有效数据spark.sql("""|create table user_login_info_2|as|select * from user_login_info_1|where uid != ' ' and phone != ' ' and addr != ' '|""".stripMargin)spark.sql(sqlText = "select count(*) count from user_login_info_2").show()//获取前20个地址spark.sql("""|create table hot_addr|as|select addr,count(addr) count from user_login_info_2|group by addr order by count desc limit 20|""".stripMargin)spark.sql(sqlText = "select * from hot_addr").show()spark.stop()}}


Spark Streaming介绍
Spark Streaming概述:

用于流式计算,处理实时数据流。
支持多种数据输入源(如Kafka、Flume、Twitter、TCP套接字等)和输出存储位置(如HDFS、数据库等)。
Spark Streaming特点:

易用性:支持Java、Python、Scala等编程语言,编写实时计算程序如同编写批处理程序。
容错性:无需额外代码和配置即可恢复丢失的数据,确保实时计算的可靠性。
整合性:可以在Spark上运行,允许重复使用相关代码进行批处理,实现交互式查询操作。
Spark Streaming架构:

驱动程序(StreamingContext)处理数据并传给SparkContext。
工作节点接收和处理数据,执行任务并备份数据到其他节点。
背压机制协调数据接收能力和资源处理能力,避免数据堆积和资源浪费。
Spark Streaming实操
词频统计案例:

使用ipad工具向999端口发送数据,Spark Streaming读取端口数据并统计单词出现次数。
代码配置包括设置关键对象、接收TCP套接字数据、扁平化处理、累加相同键值对、分组统计词频。
启动和运行:

启动netpad发送数据,Spark Streaming每隔三秒收集和处理数据。
代码中没有显式关闭状态,流式计算默认持续运行,确保数据处理不间断。
DStream创建
DStream创建方式:

RDD队列:通过SSC创建RDD队列,将RDD推送到队列中作为DStream处理。
自定义数据源:下节课详细讲解。
RDD队列案例:

循环创建多个RDD并推送到队列中,使用Spark Streaming处理RDD队列进行词频统计。
代码包括配置对象、创建可变队列、转换RDD为DStream、累加和分组统计词频。
代码如下

import org.apache.spark.SparkConfobject WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))val lineStreams = ssc.socketTextStream("node01",9999)val wordStreams = lineStreams.flatMap(_.split(" "))val wordAndOneStreams = wordStreams.map((_,1))val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)wordAndCountStreams.print()ssc.start()ssc.awaitTermination()}}


结果展示:


展示了词频统计的结果,验证了Spark Streaming的正确性和有效性。

自定义数据源的实现

需要导入新的函数并继承现有的函数。
创建数据源时需选择class而不是object。
在class中定义on start和on stop方法,并在这些方法中实现具体的功能。
类的定义和初始化

类的定义包括数据类型的设定,如端口号和TCP名称。
使用extends关键字继承父类的方法。
数据存储类型设定为内存中保存。
数据接收和处理

在on start方法中创建新线程并调用接收数据的方法。
连接到指定的主机和端口号,创建输入流并转换为字符流。
逐行读取数据并写入到spark stream中,进行词频统计。
数据扁平化和词频统计

使用block map进行数据扁平化处理。
将原始数据转换为键值对形式,并根据相同键进行分组和累加。
输出词频统计结果。
程序终止条件

设定手动终止和程序异常时的终止条件。
在满足终止条件时输出结果并终止程序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/902575.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sentinel源码—9.限流算法的实现对比一

大纲 1.漏桶算法的实现对比 (1)普通思路的漏桶算法实现 (2)节省线程的漏桶算法实现 (3)Sentinel中的漏桶算法实现 (4)Sentinel中的漏桶算法与普通漏桶算法的区别 (5)Sentinel中的漏桶算法存在的问题 2.令牌桶算法的实现对比 (1)普通思路的令牌桶算法实现 (2)节省线程的…

Redis 详解:安装、数据类型、事务、配置、持久化、订阅/发布、主从复制、哨兵机制、缓存

目录 Redis 安装与数据类型 安装指南 Windows Linux 性能测试 基本知识 数据类型 String List(双向列表) Set(集合) Hash(哈希) Zset(有序集合) 高级功能 地理位置&am…

Docker配置带证书的远程访问监听

一、生成证书和密钥 1、准备证书目录和生成CA证书 # 创建证书目录 mkdir -p /etc/docker/tls cd /etc/docker/tls # 生成CA密钥和证书 openssl req -x509 -newkey rsa:4096 -keyout ca-key.pem \ -out ca-cert.pem -days 365 -nodes -subj "/CNDocker CA" 2、为…

MCP接入方式介绍

上一篇文章,我们介绍了MCP是什么以及MCP的使用。 MCP是什么,MCP的使用 接下来,我们来详细介绍一下MCP的接入 先看官网的架构图 上图的MCP 服务 A、MCP 服务 B、MCP 服务 C是可以运行在你的本地计算机(本地服务器方式&#xff…

关于Agent的简单构建和分享

前言:Agent 具备自主性、环境感知能力和决策执行能力,能够根据环境的变化自动调整行为,以实现特定的目标。 一、Agent 的原理 Agent(智能体)被提出时,具有四大能力 感知、分析、决策和执行。是一种能够在特定环境中自主行动、感…

Gitlab runner 安装和注册

Gitlab Runner GitLab Runner是一个用于运行GitLab CI/CD流水线作业的软件包,由GitLab官方开发,完全开源。你可以在很多主流的系统环境或平台上安装它,如Linux、macOS、Windows和Kubernetes。如果你熟悉Jenkins 的话,你可以把它…

精益数据分析(18/126):权衡数据运用,精准把握创业方向

精益数据分析(18/126):权衡数据运用,精准把握创业方向 大家好!一直以来,我都希望能和大家在创业与数据分析的领域共同探索、共同进步。今天,我们继续深入研读《精益数据分析》,探讨…

Git技术详解:从核心原理到实际应用

Git技术详解:从核心原理到实际应用 一、Git的本质与核心价值 Git是由Linux之父Linus Torvalds在2005年开发的分布式版本控制系统,其核心功能是通过记录文件变更历史,帮助开发者实现以下目标: 版本回溯:随时恢复到项…

Java从入门到“放弃”(精通)之旅——String类⑩

Java从入门到“放弃”(精通)之旅🚀——String类⑩ 前言 在Java编程中,String类是最常用也是最重要的类之一。无论是日常开发还是面试,对String类的深入理解都是必不可少的。 1. String类的重要性 在C语言中&#xf…

抓取淘宝数据RPA--影刀

最近用了一下RPA软件,挑了影刀,发现很无脑也很简单,其语法大概是JAVA和PYTHON的混合体,如果懂爬虫的话,学这个软件就快的很,看了一下官方的教程,对于有基础的人来说很有点枯燥,但又不…

docker部署seafile修改默认端口并安装配置onlyoffice实现在线编辑

背景 有很多场景会用到类似seafile功能的需求,比如: 在内网中传输和共享文件个人部署私人网盘文档协同在线编辑写笔记… 这些功能seafile均有实现,并且社区版提供的功能基本可以满足个人或者小型团队的日常需求 问题 由于主机的80和443端…

计算机视觉cv2入门之视频处理

在我们进行计算机视觉任务时,经常会对视频中的图像进行操作,这里我来给大家分享一下,cv2对视频文件的操作方法。这里我们主要介绍cv2.VideoCapture函数的基本使用方法。 cv2.VideoCapture函数 当我们在使用cv2.VideoCapture函数时&#xff…

Linux之彻底掌握防火墙-----安全管理详解

—— 小 峰 编 程 目录: 一、防火墙作用 二、防火墙分类 1、逻辑上划分:大体分为 主机防火墙 和 网络防火墙 2、物理上划分: 硬件防火墙 和 软件防火墙 三、硬件防火墙 四、软件防火墙 五、iptables 1、iptables的介绍 2、netfilter/…

python项目实战-后端个人博客系统

本文分享一个基于 Flask 框架开发的个人博客系统后端项目,涵盖用户注册登录、文章发布、分类管理、评论功能等核心模块。适合初学者学习和中小型博客系统开发。 一、项目结构 blog │ app.py │ forms.py │ models.py │ ├───instance │ blog.d…

Unity 接入阿里的全模态大模型Qwen2.5-Omni

1 参考 根据B站up主阴沉的怪咖 开源的项目的基础上修改接入 AI二次元老婆开源项目地址(unity-AI-Chat-Toolkit): Github地址:https://github.com/zhangliwei7758/unity-AI-Chat-Toolkit Gitee地址:https://gitee.com/DammonSpace/unity-ai-chat-too…

第十五届蓝桥杯 2024 C/C++组 合法密码

目录 题目: 题目描述: 题目链接: 思路: substr函数: 思路详解: 代码: 代码详解; 题目: 题目描述: 题目链接: P10906 [蓝桥杯 2024 国 B] 合法密码 -…

NoSQL 简单讲解

目录 1. NoSQL 的背景与意义 1.1 数据库的演变 1.2 NoSQL 的兴起 2. NoSQL 数据库的分类 2.1 键值存储(Key-Value Stores) 2.2 文档数据库(Document Stores) 2.3 列族存储(Column-Family Stores) 2.…

122.在 Vue3 中使用 OpenLayers 实现图层层级控制(zIndex)显示与设置详解

📅 作者:彭麒 📫 邮箱:1062470959@qq.com 📌 声明:本文源码归吉檀迦俐所有,欢迎学习借鉴,如用于商业项目请注明出处 🙌 🔧 技术栈:Vue 3 + Composition API + OpenLayers 6+ + Element Plus + Tailwind CSS 🧠 一、什么是 zIndex(图层层级)? 在地图开发中…

车载测试用例开发-如何平衡用例覆盖度和测试效率的方法论

1 摘要 在进行车载测试用例编写时,会遇到多个条件导致用例排列组合爆炸的情况,但是为了产品测试质量,我们又不得不保证用例设计的需求覆盖度,这样又会使得测试周期非常长。我们如何平衡效率和测试质量?本文进行了一些…

AI——神经网络以及TensorFlow使用

文章目录 一、TensorFlow安装二、张量、变量及其操作1、张量Tensor2、变量 三、tf.keras介绍1、使用tf.keras构建我们的模型2、激活函数1、sigmoid/logistics函数2、tanh函数3、RELU函数4、LeakReLu5、SoftMax6、如何选择激活函数 3、参数初始化1、bias偏置初始化2、weight权重…