kafka整合flume与DStream转换

一、Kafka整合flume

cd /opt/software/flume/conf/

vi flume-kafka.conf

a1.sources=r1

a1.sinks=k1

a1.channels=c1

a1.sources.r1.type=spooldirt

a1.sources.r1.spoolDir=/root/flume-kafka

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic=testTopice

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

a1.sinks.k1.kafka.flumeBatchSize=20

a1.sinks.k1.kafka.producer.acks=1

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

cd /root/

mkdir flume-kafka

ll

drwxr-xr-x   2 root root 4096 11月  8 22:02 agent3

-rw-------.  1 root root  955 9月   6 16:41 anaconda-ks.cfg

-rw-r--r--   1 root root    1 10月 25 18:30 exec-logger.conf

drwxr-xr-x   2 root root   27 11月 15 18:00 flume-hive

drwxr-xr-x   2 root root    6 12月  3 03:59 flume-kafka

-rw-r--r--   1 root root   63 11月  8 23:01 flume-position.json

drwxr-xr-x  22 root root 4096 12月  3 03:59 kafkadata

drwxr-xr-x   3 root root   21 10月 11 18:32 opt

drwxr-xr-x   3 root root 4096 11月  8 18:17 testDir

drwxr-xr-x   2 root root   38 11月  8 19:01 testdir2

-rw-r--r--   1 root root  108 11月 15 17:09 test.log

drwxr-xr-x   2 root root 4096 11月  8 18:49 testSink

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 2

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

cd /root/flume-kafka/

echo "hello" >>test3.txt

echo "hello flume" >>test2.txt

cd /opt/software/flume/conf/

vi kafka-flume.conf

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

Hello

 hello kafka

hello flume

flume-ng agent -c conf/ -f conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

二、kafka架构深入

分区策略:轮询(RoundRobin)、按 Key 哈希(Hash)、自定义分区。

数据可靠性:通过 ACK 机制(0、1、-1)和 ISR(同步副本集合)保证,acks=-1时需等待 Leader 和 Follower 全部落盘。

事务与幂等性:0.11 版本引入幂等性(enable.idompotence=true),结合 At Least Once 实现 Exactly Once 语义。

三、Spark-Streaming核心编程

1.DStream 转换

DStream 是 Spark-Streaming 处理实时数据的基本单位,可以理解为 “实时数据流”。

转换操作就是对这个数据流进行加工处理,比如过滤、拆分、统计等,就像工厂流水线对原材料进行加工一样。

操作分为两类:

无状态转换:只处理当前批次的数据,不关心历史数据(比如统计当前 3 秒内的单词数)。

有状态转换:会记住历史数据(比如统计从程序启动到现在的总单词数),文档里没详细讲,重点在无状态部分。

2.无状态转换的常见操作

无状态转换就像 “即处理即丢弃”,每次只处理当前批次的数据,不保留之前的结果。

常见函数举例

3.Transform转换

Transform是一个 “万能转换” 函数,可以对每个批次的 RDD(DStream 内部由多个 RDD 组成)执行任意自定义操作,甚至可以使用 Spark 原生的 RDD 函数(即使 DStream 没有直接提供)

4.Join转换

join用于合并两个数据流中相同键的数据,就像拼拼图一样,只有键匹配的部分才能拼在一起。

适用于合并两个来源的单词数据

最后运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903536.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络通讯【QTcpServer、QTcpSocket、QAbstractSocket】

目录 QTcpServer class简单描述成员函数和信号 QTcpSocket Class详细描述成员函数和信号 QAbstractSocket Class详细描述成员函数和信号成员函数说明文档 QT实现服务器和客户端通讯服务器端:通讯流程原代码 客户端通信流程原代码 QTcpServer class header: #includ…

大模型在肾癌诊疗全流程中的应用研究报告

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与方法 1.3 国内外研究现状 二、大模型预测肾癌术前情况 2.1 基于影像组学的肾癌良恶性及分级预测 2.1.1 MRI 影像组学模型预测肾透明细胞癌分级 2.1.2 CT 影像深度学习模型鉴别肾肿物良恶性及侵袭性 2.2 大模型对手术风…

网络原理 - 11(HTTP/HTTPS - 2 - 请求)

目录 HTTP 请求(Request) 认识 URL URL 基本格式 关于 URL encode 认识方法(method) 1. GET 方法 2. POST 方法 认识请求“报头”(header) Host Content-Length Content-Type User-Agent&…

实现MySQL高可用性:从原理到实践

目录 一、概述 1.什么是MySQL高可用 2.方案组成 3.优势 二、资源清单 三、案例实施 1.修改主机名 2.安装MySQL数据库(Master1、Master2) 3.配置mysql双主复制 4.安装haproxy(keepalived1、keepalived2) 5.安装keepaliv…

CSS学习笔记8——表格

一、表格 1-1、创建表格 在Word文档中,如果要创建表格,只需插入表格,然后设定相应的行数和列数即可。然而在HTML网页中,所有的元素都是通过标签定义的,要想创建表格,就需要使用与表格相关的标签。使用标签…

爬虫学习笔记(一)

目的 通过编写程序爬取互联网上的优质资源 爬虫必须要使用python吗 非也~ 编程语言知识工具,抓取到数据才是目的,而大多数爬虫采用python语言编写的原因是python的语法比较简单,python写爬虫比较简单!好用!而且pyt…

大厂面试:MySQL篇

前言 本章内容来自B站黑马程序员java大厂面试题和小林coding 博主学习笔记,如果有不对的地方,海涵。 如果这篇文章对你有帮助,可以点点关注,点点赞,谢谢你! 1.MySQL优化 1.1 定位慢查询 定位 一个SQL…

C++_数据结构_详解红黑树

✨✨ 欢迎大家来到小伞的大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C学习 小伞的主页:xiaosan_blog 制作不易!点个赞吧!!谢谢喵!&…

DNA复制过程3D动画教学工具

DNA复制过程3D动画教学工具 访问工具页面: DNA复制动画演示 工具介绍 我开发了一个交互式的DNA复制过程3D动画演示工具,用于分子生物学教学。这个工具直观展示了: DNA双螺旋结构的解旋过程碱基互补配对原理半保留复制机制完整的复制周期动画 主要特点…

使用阿里云 CDN 保护网站真实 IP:完整配置指南

使用阿里云 CDN 保护网站真实 IP:完整配置指南 一、宝塔面板准备工作1. 确认网站部署状态2. 宝塔中检查网站配置 二、配置阿里云 CDN1. 添加域名到 CDN2. 配置 DNS 解析3. 配置成功确认 三、宝塔面板安全加固(隐藏 IP 的关键步骤)1. 禁止通过…

PHP经验笔记

isset — 检测变量是否设置,并且不是NULL; 若变量存在且值不为NULL,则返回 TURE 若变量存在且其值为NULL或变量不存在,则返回 FALSE 结论 1. 当变量为空字符串、数值0和布尔值false时,isset全部返回true 2. 当变量不存在和变量存在且值为NULL…

Linux——安装NVM

1. 安装命令 官方地址:https://github.com/nvm-sh/nvm curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.3/install.sh | bash2. 安装完成后执行命令 source ~/.bashrc3. 验证 nvm -v

CentOS 7 磁盘阵列搭建与管理全攻略

CentOS 7 磁盘阵列搭建与管理全攻略 在数据存储需求日益增长的今天,磁盘阵列(RAID)凭借其卓越的性能、数据安全性和可靠性,成为企业级服务器和数据中心的核心存储解决方案。CentOS 7 作为一款稳定且功能强大的 Linux 操作系统&am…

C++每日训练 Day 18:构建响应式表单与数据验证(初学者友好)

📘 本篇目标:在前几日协程与事件驱动机制基础上,构建一个响应式表单系统,实现用户输入的异步验证与反馈。通过协程挂起/恢复机制,简化异步逻辑,提升代码可读性。 🔁 回顾 Day 17:响应…

Vue初步总结-摘自 黑马程序员

本文摘自 bilibili 前端最新Vue2Vue3基础入门到实战项目全套教程,自学前端vue就选黑马程序员,一套全通关! 更多详情可参考: https://www.yuque.com/u26161316/pic6n4/heyv8nv8ubfk3fhe?singleDoc# 《Vue》

【基于Qt的QQ音乐播放器开发实战:从0到1打造全功能音乐播放应用】

🌹 作者: 云小逸 🤟 个人主页: 云小逸的主页 🤟 motto: 要敢于一个人默默的面对自己,强大自己才是核心。不要等到什么都没有了,才下定决心去做。种一颗树,最好的时间是十年前,其次就是现在&…

线程池(二):深入剖析synchronized关键字的底层原理

线程池(二):深入剖析synchronized关键字的底层原理 线程池(二):深入剖析synchronized关键字的底层原理一、基本使用1.1 修饰实例方法1.2 修饰静态方法1.3 修饰代码块 二、Monitor2.1 Monitor的概念2.2 Moni…

Linux CentOS 7 安装Apache 部署html页面

*、使用yum包管理器安装Apache。运行以下命令: sudo yum install httpd *、启动Apache服务 sudo systemctl start httpd *、设置Apache服务开机自启 # 启用开机自启动 sudo systemctl enable httpd# 禁用开机自启动 sudo systemctl disable httpd *、验证Apac…

前端设置三行文本省略号,失效为什么?

实际效果:第三行出现省略号,但是第四行依旧显示了部分文字 这个问题通常是由于 CSS 多行文本截断(-webkit-line-clamp)的计算方式或布局冲突导致的。以下是完整解决方案,确保三行文本截断正确显示省略号,并…

git学习之git常用命令

1. 初始化仓库 git init初始化一个新的 Git 仓库。 2. 克隆远程仓库 git clone <repository-url>从远程服务器克隆一个已有仓库到本地。 3. 配置用户名和邮箱 git config --global user.name "Your Name" git config --global user.email "youexampl…