Flume详解——介绍、部署与使用

1. Flume 简介

Apache Flume 是一个专门用于高效地 收集、聚合、传输 大量日志数据的 分布式、可靠 的系统。它特别擅长将数据从各种数据源(如日志文件、消息队列等)传输到 HDFSHBaseKafka 等大数据存储系统。

特点

  • 可扩展:支持大规模数据传输,灵活扩展
  • 容错性:支持数据恢复和失败重试,确保数据不丢失
  • 多种数据源:支持日志文件、网络数据、HTTP请求、消息队列等多种来源
  • 流式处理:数据边收集边传输,支持实时传输

2. Flume 架构

Flume 的核心架构由三大组件构成,理解它们对掌握 Flume 的原理至关重要:

2.1 Source(来源)

负责从数据源获取数据,比如:

  • taildir(监听日志文件)
  • exec(执行命令读取数据)
  • kafka(从 Kafka 消费数据)
  • netcat(监听端口接收数据)

2.2 Channel(通道)

作为 缓冲区,临时存储数据,支持两种常见类型:

  • Memory Channel(内存通道):速度快,但重启可能丢数据
  • File Channel(文件通道):写入磁盘,保证数据持久性

2.3 Sink(下游输出)

负责将数据写入目标位置,支持:

  • HDFS(写入 Hadoop 分布式文件系统)
  • HBase(写入 HBase 数据库)
  • Kafka(推送到 Kafka)
  • ElasticSearch(支持实时检索)

补充组件

  • Sink Processor:管理多个 Sink,支持负载均衡、故障转移
  • Interceptor:在数据进入 Channel 前拦截处理,比如格式转换、过滤数据等

3. Flume 数据流动原理

数据在 Flume 中是按事件 (Event) 传输的,基本流程如下:

1️⃣ Source 从外部采集数据,将每条数据封装为一个 Event
2️⃣ Event 进入 Channel 暂存
3️⃣ Sink 从 Channel 拉取数据,写入目标系统

👉 示例流程
Web日志 -> Source(taildir) -> Channel(Memory Channel) -> Sink(HDFS)


4. Flume 部署模式

Flume 支持灵活的部署方式,主要有三种:

  • 单机模式:Source、Channel、Sink 都在同一节点,简单但不适合大规模数据
  • 多机流模式:多个 Flume 节点串联,Source 采集数据,Sink 输出到下一个 Flume 节点的 Source,逐层转发
  • 多 Agent 模式:多个 Flume Agent 独立采集数据,汇总到统一 Sink

5. 本地部署(单机)

下载地址:http://archive.apache.org/dist/flume/

tar -zxf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-flume-1.9.0-bin/ flume--将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
cd flume/lib
rm guava-11.0.2.jar

6.Flume 入门案例

6.1 监控端口数据官方案例

案例需求: 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

实现步骤:

yum install -y nc--判断 44444 端口是否被占用
netstat -nlp | grep 44444cd /export/server/flume/
mkdir job
cd job/vim net-flume-logger.conf
--添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 开启 flume 监听端口

第一种写法:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/net-flume-logger.conf -
Dflume.root.logger=INFO,console

第二种写法:

bin/flume-ng agent -c conf/ -n a1 -f job/net-flume-logger.conf -Dflume.root.logger=INFO,console

参数说明:

--conf/-c:表示配置文件存储在 conf/目录

--name/-n:表示给 agent 起名为 a1

--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。

-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

此时服务端已经开启,新建一个会话作为客户端,使用 netcat 工具向本机的 44444 端口发送内容

nc localhost 44444
hello

在 Flume 监听页面观察接收数据情况

6.2 实时监控单个追加文件

案例需求:实时监控 Hive 日志,并上传到 HDFS 中

实现步骤:

确认 Hadoop 和 Hive 环境已经配置,没有配置的可以参考这两篇文章

本地部署HDFS集群https://blog.csdn.net/m0_73641796/article/details/145998092?spm=1001.2014.3001.5501


本地部署Hive集群https://blog.csdn.net/m0_73641796/article/details/146078614?spm=1001.2014.3001.5501

创建 flume-file-hdfs.conf 文件 

vim flume-file-hdfs.conf
--添加如下内容# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

确保 hadoop 用户有写入权限

mkdir -p /export/server/flume/logs
chown -R hadoop:hadoop /export/server/flume/logs
chmod -R 755 /export/server/flume/logs

 运行 Flume(用Hadoop用户运行,因为要操作HDFS) 

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

新建会话,开启 Hadoop 和 Hive 并操作 Hive 产生日志

su hadoopstart-dfs.sh
start-yarn.shcd /export/server/hive/
nohup bin/hive --service metastore  >> logs/metastore.log 2>&1 &
bin/hive

在 HDFS 上查看文件

6.3 实时监控目录下多个新文件

案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

实现步骤:

创建配置文件

vim flume-dir-hdfs.conf
--添加如下内容a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /export/server/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# 忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
# 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
# 设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

创建目录并启动

--创建目录
mkdir upload
chown hadoop:hadoop upload/--启动flume(用hadoop用户)
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

新建会话,向 upload 文件夹中添加文件

cd /export/server/flume/upload/
echo "hello world" > 1.txt

查看 HDFS 上的数据

6.4 实时监控目录下的多个追加文件

案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

实现步骤:

创建配置文件

vim flume-taildir-hdfs.conf
--添加如下内容a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /export/server/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /export/server/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /export/server/flume/files2/.*log.*# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

创建目录并启动

--创建目录
mkdir files files2
chown hadoop:hadoop files
chown hadoop:hadoop files2--启动flume(用hadoop用户)
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

向 upload 文件夹中添加文件

cd files
echo hello >> file1.txt

查看 HDFS 上的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/898146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

破解企业内部盗版软件管理难题的技术方案

引言:盗版软件——企业数字化转型的“隐形地雷” 据BSA《全球软件调查报告》显示,37%的企业存在员工私自安装盗版软件的行为,由此引发的法律诉讼、数据泄露及罚款风险年均增长28%。LMT基于“预防-检测-治理”三位一体技术框架,为…

Spring源码解析

第一讲 容器接口 BeanFactory和ApplicationContext接口的具体继承关系: ApplicationContext 间接继承了BeanFactory BeanFactory是父接口ApplicationContext是子接口,里面一些功能调用了BeanFactory BeanFactory的功能 表面上只有 getBean&#xff0…

Django Rest Framework 创建纯净版Django项目部署DRF

描述创建纯净版的Django项目和 Django Rest Framework 环境的部署 一、创建Django项目 1. 环境说明 操作系统 Windows11python版本 3.9.13Django版本 V4.2.202. 操作步骤(在Pycharm中操作) 创建Python项目drfStudy、虚拟环境 ​虚拟环境中安装 jdangopip install django==4.…

图解AUTOSAR_CP_NetworkManagementInterface

AUTOSAR 网络管理接口(Nm)详解 AUTOSAR 网络管理接口规范与实现指南 目录 1. 概述 1.1. 网络管理接口的作用1.2. 网络管理接口的特点 2. 网络管理接口架构 2.1. 架构概览2.2. 模块交互关系 3. 网络管理状态机 3.1. 状态定义3.2. 状态转换 4. 协调算法 4.1. 协调关闭流程4.2. 同…

java学习总结(八):Spring boot

一、SpringBoot简介 传统Spring开发缺点: 1、导入依赖繁琐 2、项目配置繁琐 Spring Boot是全新框架(更像是一个工具, 脚手架),是Spring提供的一个子项目, 用于快速构建Spring应用程序。 随着Spring 3.0的发布,Spring 团…

vue-router实现

实现一个简化版的 vue-router 可以帮助我们更好地理解 Vue 路由是如何工作的。Vue Router 主要的功能是基于浏览器的 URL 来管理组件的显示,能够根据 URL 变化切换不同的视图。下面是一个简化版的实现,用于帮助你理解基本的路由机制。 创建一个简单的 V…

Redis 服务器:核心功能与优化实践

Redis 服务器:核心功能与优化实践 引言 Redis(Remote Dictionary Server)是一款高性能的键值对存储系统,广泛用于缓存、消息队列、实时排行榜等场景。本文将深入探讨Redis服务器的核心功能,并提供一些优化实践的技巧,以帮助读者更好地理解和运用Redis。 Redis的核心功…

openEuler24.03 LTS下安装MySQL8

前提条件 拥有openEuler24.03 LTS环境,可参考:Vmware下安装openEuler24.03 LTS 步骤 卸载原有mysql及mariadb sudo systemctl stop mysql mysqld 2>/dev/null sudo rpm -qa | grep -i mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps 2>/dev/…

GLOW-TTS

我首先需要理解用户的指令,用户希望我翻译文章的3.1节“Training and Inference Procedures”部分。为了完成这个任务,我需要仔细阅读文章的3.1节,理解其中的技术细节和概念,然后将这些内容准确地翻译成中文。 在阅读3.1节时&…

【算法思想】高精度

引入 首先了解&#xff1a; 1. int 范围为10^9 2. long long 范围数量级为10^18 如果超过该数量级&#xff0c;该怎么办&#xff1f; ——这就是高精度、大数的算法问题 加法 输入两个整数a,b,输出他们的和&#xff08;<10的500次方&#xff09; 核心是加法的核心——》每…

【失败了】LazyGraphRAG利用本地ollama提供Embedding model服务和火山引擎的deepseek API构建本地知识库

LazyGraphRAG测试结果如下 数据&#xff1a; curl https://www.gutenberg.org/cache/epub/24022/pg24022.txt -o ./ragtest/input/book.txt 失败了 气死我也&#xff01;&#xff01;&#xff01;对deepseek-V3也不是很友好啊&#xff0c;我没钱prompt 微调啊&#xff0c;晕死…

ccfcsp3402矩阵重塑(其二)

//矩阵重塑&#xff08;其二&#xff09; #include<iostream> using namespace std; int main(){int n,m,t;cin>>n>>m>>t;int c[10000][10000];int s0,sum0;int d[10000],k[100000];for(int i0;i<n;i){for(int j0;j<m;j){cin>>c[i][j];d[s…

算法-除自身以外数组的乘积

力扣题目&#xff1a;238. 除自身以外数组的乘积 - 力扣&#xff08;LeetCode&#xff09; 题目描述&#xff1a; 给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums…

Unity Shader - UI Sprite Shader之简单抠图效果

Sprite抠图效果&#xff1a; 前言 在PhotoShop中我们经常会用到抠图操作&#xff0c;现在就用Shader实现一个简单的抠图效果。 实现原理&#xff1a; 使用当前像素颜色与需要抠掉的颜色相减作比较&#xff0c;然后与一个指定的阈值比较以决定是否将其显示出来&#xff1b; U…

【Mac】安装 Parallels Desktop、Windows、Rocky Linux

一、安装PD 理论上&#xff0c;PD只支持试用15天&#xff01;当然&#xff0c;你懂的。 第一步&#xff0c;在 Parallels Desktop for Mac 官网 下载 Install Parallels Desktop.dmg第二步&#xff0c;双击 Install Parallels Desktop.dmg 第三步&#xff0c;双击安装Paralle…

学习单片机需要多长时间才能进行简单的项目开发?

之前有老铁问我&#xff0c;学单片机到底要多久&#xff0c;才能进行简单的项目开发&#xff1f;是三个月速成&#xff0c;还是三年磨一剑&#xff1f; 今天咱们就来聊聊这个话题&#xff0c;我不是什么高高在上的专家&#xff0c;就是个踩过无数坑、烧过几块板子的“技术老友”…

pyqt 上传文件或者文件夹打包压缩文件并添加密码并将密码和目标文件信息保存在json文件

一、完整代码实现 import sys import os import json import pyzipper from datetime import datetime from PyQt5.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout,QPushButton, QLineEdit, QLabel, QFileDialog,QMessageBox, QProgressBar) from PyQt5.…

centos操作系统上传和下载百度网盘内容

探序基因 整理 进入百度网盘官网百度网盘 客户端下载 下载linux的rpm格式的安装包 在linux命令行中输入&#xff1a;rpm -ivh baidunetdisk_4.17.7_x86_64.rpm 出现报错&#xff1a; 错误&#xff1a;依赖检测失败&#xff1a; libXScrnSaver 被 baidunetdisk-4.17.7-1.x8…

LeetCode134☞加油站

关联LeetCode题号134 本题特点 贪心局部最优解-部分差值 如果小于0&#xff08;消耗大于油站油量&#xff09; 就从下一个加油站开始&#xff0c;因为如果中间有小于0的情况 当前站就不可能是始发站&#xff0c;整体最优解-整体差值 如果小于0 &#xff0c;那么就是不能有始发…

基于 Verilog 的时序设计:从理论到实践的深度探索

在数字电路设计领域,时序设计是一个至关重要的环节,它涉及到组合逻辑电路与时序逻辑电路的设计差异、时钟信号的运用以及触发器的工作原理等多个方面。本文将围绕基于 Verilog 的时序设计实验展开,详细阐述实验过程、代码实现以及结果分析,帮助读者深入理解时序设计的核心概…