kafka与flume的整合、spark-streaming

kafka与flume的整合

 前期配置完毕,开启集群

需求1:

利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台(三个node01中运行)

1.在kafka中建立topic

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3

2.启动flume(第一个nide01)

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1

3.启动kafka消费者(第二个node01)

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning

4.测试(第三个node01)

进入到指定路径下(cd /root/flume-kafka/),输入测试数据

返回到kafka消费者,可以看到数据产生

需求2:

Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上

1.启动kafka生产者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic

2.启动flume

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

3.在生产者中写入数据

4.在flume中采集到数据

spark-streaming

DStream转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

idea中的

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
// 导入 RDD 类型
import org.apache.spark.rdd.RDDobject Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")val ssc = new StreamingContext(sparkConf, Seconds(3))val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})wordAndCountDStream.print()ssc.start()ssc.awaitTermination()}
}

虚拟机中的

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同

idea中的

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.rdd.RDDobject join {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 joinval sparkConf = new SparkConf().setMaster("local[*]").setAppName("join")// 创建 StreamingContext 对象,设置批处理间隔为 3 秒val ssc = new StreamingContext(sparkConf, Seconds(3))// 从 node01 的 9999 端口接收文本流val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)// 从 node02 的 8888 端口接收文本流val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("node02", 8888)// 将 lineDStream1 中的每行文本拆分为单词,并映射为 (单词, 1) 的键值对val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))// 将 lineDStream2 中的每行文本拆分为单词,并映射为 (单词, "a") 的键值对val wordToADstream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))// 对两个 DStream 进行 join 操作,结果为 (单词, (1, "a")) 的键值对val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADstream)// 打印 join 操作后的结果joinDStream.print()// 启动 StreamingContextssc.start()// 等待计算终止ssc.awaitTermination()}
}

虚拟机中的

node01

        nc -lk 9999

node02

        nc -lk 8888

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis高级进阶

1.redis主从复制 redis主从复制1 2.redis哨兵模式 哔哩哔哩视频 redis哨兵模式1 redis哨兵模式2 redis哨兵模式3 3.redis分片集群 redis分片集群1 redis分片集群2 redis分片集群3

uniapp: 低功耗蓝牙(BLE)的使用

在微信小程序中实现蓝牙对接蓝牙秤的重量功能,主要依赖微信小程序提供的低功耗蓝牙(BLE)API。以下是一个清晰的步骤指南,帮助你完成从连接蓝牙秤到获取重量数据的开发流程。需要注意的是,具体实现可能因蓝牙秤的协议和…

3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目

安装 pnpm install icraft/player-react --saveimport { ICraftPlayer } from "icraft/player-react";export default function MyScene() {return <ICraftPlayer srcyour-scene.iplayer />; }icraft/player-react 为开发者提供了一站式的3D数字孪生可视化解决…

云数据中心整体规划方案PPT(113页)

1. 引言 概述&#xff1a;云数据中心整体规划方案旨在构建弹性、高效的云计算基础设施&#xff0c;通过软件定义数据中心&#xff08;SDDC&#xff09;实现资源虚拟化与管理自动化。 2. 技术趋势与背景 技术革新&#xff1a;随着云计算、虚拟化及自动化技术的发展&#xff0c…

(六)机器学习---聚类与K-means

到本篇文章&#xff0c;我们先对前几篇所学习的算法进行一个回顾&#xff1a; 而本篇文章我们将会介绍聚类以及K-means算法。 分类问题回归问题聚类问题各种复杂问题决策树√线性回归√K-means√神经网络√逻辑回归√岭回归密度聚类深度学习√集成学习√Lasso回归谱聚类条件随机…

在html中如何创建vue自定义组件(以自定义文件上传组件为例,vue2+elementUI)

1、先上代码&#xff1a;vueUpload.js var dom <div class"upload-file"><el-upload :action"uploadFileUrl" :before-upload"handleBeforeUpload" :file-list"fileList" :limit"limit":on-error"handleUpl…

计算机基础:二进制基础14,二进制加法

专栏导航 本节文章分别属于《Win32 学习笔记》和《MFC 学习笔记》两个专栏&#xff0c;故划分为两个专栏导航。读者可以自行选择前往哪个专栏。 &#xff08;一&#xff09;WIn32 专栏导航 上一篇&#xff1a;计算机基础&#xff1a;二进制基础13&#xff0c;十六进制与二进…

可视化图解算法: 判断是不是二叉搜索树(验证二叉搜索树)

1. 题目 描述 给定一个二叉树根节点&#xff0c;请你判断这棵树是不是二叉搜索树。 二叉搜索树满足每个节点的左子树上的所有节点的值均严格小于当前节点的值&#xff1b;并且右子树上的所有节点的值均严格大于当前节点的值。 数据范围&#xff1a;节点数量满足 1≤n≤10^4…

Markdown转WPS office工具pandoc实践笔记

随着DeepSeek、文心一言、讯飞星火等AI工具快速发展&#xff0c;其输出网页内容拷贝到WPS Office过程中&#xff0c;文档编排规整的格式很难快速复制。 注&#xff1a;WPS Office不支持Markdown格式&#xff0c;无法识别式样。 在这里推荐个免费开源工具Pandoc&#xff0c;实现…

python的turtle库实现四叶草

实现代码&#xff1a; import turtle turtle.pencolor(‘green’) turtle.fillcolor(‘green’) turtle.begin_fill() turtle.circle(100,90) turtle.left(90) turtle.circle(100,90) turtle.right(180) turtle.circle(100, 90) turtle.left(90) turtle.circle(100,90) tu…

北重数控滑台加工厂家:汽车零部件试验铁地板-安全性能的测试方法

汽车零部件的安全性能测试是非常重要的&#xff0c;其中铁地板测试是其中的一种常见测试方法之一。铁地板测试主要用于评估汽车零部件在发生碰撞或事故时的安全性能&#xff0c;以确保零部件在各种情况下都能提供有效的保护和安全性能。 铁地板测试通常包括以下步骤和方法&…

Linux0.11系统调用:预备知识

系统调用 预备知识 目标&#xff1a;了解系统调用的流程&#xff0c;在Linux 0.11上添加两个系统调用&#xff0c;并编写两个简单的应用程序测试它们。 对应章节&#xff1a;同济大学赵炯博士的《Linux内核0.11完全注释&#xff08;修正版V3.0&#xff09;》的第5.5节 下面就针…

如何防止 ES 被 Linux OOM Killer 杀掉

当 Linux 系统内存不足时&#xff0c;内核会找出一个进程 kill 掉它释放内存&#xff0c;旨在保障整个系统不至于崩溃。如果 ES 按照最佳实践去实施部署&#xff0c;会保留一半的内存&#xff0c;不至于发生此类事情。但事情总有例外&#xff0c;有的朋友可能 ES 和其他的程序部…

swagger2升级至openapi3的利器--swagger2openapi

背景&#xff1a; 因为项目需要升级JDK&#xff0c;涉及到swagger2升级至openapi3的情况。由于swagger 2和openapi 3的语法差距太大&#xff0c;需要对yaml进行升级。无奈单个yaml文件的内容太大&#xff0c;高至4万多行&#xff0c;手动进行语法的转换肯定是不可能了&#xff…

在yolo中Ultralytics是什么意思呢?超越分析的智能

在YOLO&#xff08;You Only Look Once&#xff09;目标检测框架中&#xff0c;Ultralytics 是一家专注于计算机视觉和机器学习技术的公司&#xff0c;同时也是YOLO系列模型&#xff08;如YOLOv5、YOLOv8等&#xff09;的官方开发和维护团队。以下是关键点解析&#xff1a; 1. …

【阿里云大模型高级工程师ACP习题集】2.7 通过微调增强模型能力 (上篇)(⭐️⭐️⭐️ 重点章节!!!)

习题集: 【单选题】在大模型微调中,与提示工程和RAG相比,微调的独特优势在于( ) A. 无需外部工具即可提升模型表现 B. 能让模型学习特定领域知识,提升底层能力 C. 可以更高效地检索知识 D. 能直接提升模型的知识边界,无需训练 【多选题】以下关于机器学习和传统编程的说…

CuML + Cudf (RAPIDS) 加速python数据分析脚本

如果有人在用Nvidia RAPIDS加速pandas和sklearn等库&#xff0c;请看我这个小示例&#xff0c;可以节省你大量时间。 1. 创建环境 请使用uv&#xff0c;而非conda/mamba。 # install uv if not yetcurl -LsSf https://astral.sh/uv/install.sh | shuv init data_gpucd data_g…

2-SAT之完美塔防

小N最近喜欢玩一款塔防游戏。 题目描述 这款游戏的棋盘是一个 nm 的网格&#xff0c;每个格子上会有以下类型物件&#xff1a; A 型炮台&#xff1a;会向上下两个方向同时发射激光&#xff0c;符号为 |;B 型炮台&#xff1a;会向左右两个方向同时发射激光&#xff0c;符号为…

【android bluetooth 案例分析 03】【PTS 测试 】【PBAP/PCE/SSM/BV-02-C】

1. 测试介绍 PBAP/PCE/SSM/BV-02-C [PCE Closes a PBAP Session] 1. Test Purpose Verify that the PCE can terminate a PBAP session. 2. Initial Condition IUT: The IUT is engaged in a PBAP session with the Lower Tester.Lower Tester: The Lower Tester is engag…

ArcGIS:开启洪水灾害普查、评估与制图新征程

技术点目录 一、洪水普查技术规范解读二、ArcGIS介绍及数据管理三、空间数据的转换与处理四、洪水淹没专题地图制作五、矢量数据的采集与处理六、栅格数据的下载与处理七、ArcGIS水文分析八、ArcGIS洪水分析九、ArcGIS淹没分析了解更多 ———————————————————…