做项目的编程网站wordpress的编辑器插件

web/2025/9/30 12:10:30/文章来源:
做项目的编程网站,wordpress的编辑器插件,哈尔滨精致网站建设,东莞网站建设 汇卓第 1 章 Flume 概述 1.1 Flume 定义 Flume 是 Cloudera 提供的一个高可用的#xff0c;高可靠的#xff0c;分布式的 海量日志采集、聚合和传 输的系统 。 Flume 基于流式架构#xff0c;灵活简单。 为什么选用 Flume Python 爬虫数据 Java 后台日志数据 服… 第 1 章 Flume 概述 1.1 Flume 定义 Flume 是 Cloudera 提供的一个高可用的高可靠的分布式的 海量日志采集、聚合和传 输的系统 。 Flume 基于流式架构灵活简单。 为什么选用 Flume Python 爬虫数据 Java 后台日志数据 服务器本地磁盘 文件夹 HDFS Flume Flume 最主要的作用就是实时读取服务器本地磁盘的数据将数据写入到 HDFS 。 Kafka 网络端口数据 1.2 Flume 基础架构 1.2.1 Agent Agent 是一个 JVM 进程它以事件的形式将数据从源头送至目的。 Agent 主要有 3 个部分组成 Source 、 Channel 、 Sink 。 1.2.2 Source Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种 格式的日志数据包括 avro 、thrift、 exec 、jms、 spooling directory 、 netcat 、 taildir 、 sequence generator、syslog、http、legacy。 1.2.3 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储 或索引系统、或者被发送到另一个 Flume Agent。 Sink 组件目的地包括 hdfs 、 logger 、 avro 、thrift、ipc、 file 、 HBase 、solr、自定 义。 1.2.4 Channel Channel 是位于 Source 和 Sink 之间的缓冲区。因此Channel 允许 Source 和 Sink 运 作在不同的速率上。Channel 是线程安全的可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。 Flume 自带两种 Channel Memory Channel 和 File Channel 。 Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适 用。如果需要关心数据丢失那么 Memory Channel 就不应该使用因为程序死亡、机器宕 机或者重启都会导致数据丢失。 File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数 据。 1.2.5 Event 传输单元Flume 数据传输的基本单元以 Event 的形式将数据从源头送至目的地。 Event 由 Header 和 Body 两部分组成Header 用来存放该 event 的一些属性为 K-V 结构 Body 用来存放该条数据形式为字节数组。 第 2 章 Flume 入门 2.1 Flume 安装部署 2.1.1 安装地址 1Flume 官网地址http://flume.apache.org/ 2文档查看地址http://flume.apache.org/FlumeUserGuide.html 3下载地址http://archive.apache.org/dist/flume/ 2.1.2 安装部署 1将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下 2解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下 [atguiguhadoop102 software]$ tar -zxf /opt/software/apache flume-1.9.0-bin.tar.gz -C /opt/module/ 3修改 apache-flume-1.9.0-bin 的名称为 flume [atguiguhadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume 4将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3 [atguiguhadoop102 lib]$ rm /opt/module/flume/lib/guava- 11.0.2.jar 2.2 Flume 入门案例 2.2.1 监控端口数据官方案例 1案例需求 使用 Flume 监听一个端口收集该端口数据并打印到控制台。 2需求分析  ————————————————————————————— 监听数据端口案例分析 2 Flume 监控本机的 44444 端口。 通过 Flume 的 source 端读取数据。 1 通过 netcat 工具向本机的 44444 端 口发送数据 3 Flume 将获取的数据通过 Sink 端写出到控制台 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet.conf -Dflume.root.loggerINFO,console nc localhost 44444 44444 端口 控制台 hello hello 测试命令 3实现步骤 1安装 netcat 工具 [atguiguhadoop102 software]$ sudo yum install -y nc 2判断 44444 端口是否被占用 [atguiguhadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444 3创建 Flume Agent 配置文件 flume-netcat-logger.conf 4在 flume 目录下创建 job 文件夹并进入 job 文件夹。 [atguiguhadoop102 flume]$ mkdir job [atguiguhadoop102 flume]$ cd job/ 5在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。 [atguiguhadoop102 job]$ vim flume-netcat-logger.conf 6在 flume-netcat-logger.conf 文件中添加如下内容。 添加内容如下 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 # Describe the sink a1.sinks.k1.type logger # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 注配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html 配置文件解析 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 # Describe the sink a1.sinks.k1.type logger # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 a1: 表示 agent 的名称 r1: 表示 a1 的 Source 的名称 k1: 表示 a1 的 Sink 的名称 c1: 表示 a1 的 Channel 的名称 表示 a1 的输入源类型为 netcat 端口类型 表示 a1 的监听的主机 表示 a1 的监听的端口号 表示 a1 的输出目的地是控制台 logger 类型 表示 a1 的 channel 类型是 memory 内存型 表示 a1 的 channel 总容量 1000 个 event 表示 a1 的 channel 传输时收集到了 100 条 event 以后再去提交事务 表示将 r1 和 c1 连接起来 表示将 k1 和 c1 连接起来 7先开启 flume 监听端口 第一种写法 [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.loggerINFO,console 第二种写法 [atguiguhadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.loggerINFO,console 参数说明 --conf/-c表示配置文件存储在 conf/目录 --name/-n表示给 agent 起名为 a1 --conf-file/-fflume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。 -Dflume.root.loggerINFO,console -D 表示 flume 运行时动态修改 flume.root.logger 参数属性值并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。 8使用 netcat 工具向本机的 44444 端口发送内容 [atguiguhadoop102 ~]$ nc localhost 44444 hello atguigu 9在 Flume 监听页面观察接收数据情况 2.2.2 实时监控单个追加文件 1案例需求实时监控 Hive 日志并上传到 HDFS 中 2需求分析 实时读取本地文件到 HDFS 案例 Hive 日志文件 /opt/module/hive/lo gs/ hive.log Flume 监控文件 HDFS Hive 实时更新日志 Exec Source HDFS Sink Memory Channel 1 创建符合条件的 flume 配置文件 2 执行配置文件开启监控 3 开启 Hive 生成日志 4 查看 HDFS 上数据 3实现步骤 1Flume 要想将数据输出到 HDFS依赖 Hadoop 相关 jar 包 检查/etc/profile.d/my_env.sh 文件确认 Hadoop 和 Java 环境变量配置正确 JAVA_HOME/opt/module/jdk1.8.0_212 HADOOP_HOME/opt/module/ha/hadoop-3.1.3 PATH$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export PATH JAVA_HOME HADOOP_HOME 2创建 flume-file-hdfs.conf 文件 创建文件 [atguiguhadoop102 job]$ vim flume-file-hdfs.conf 注要想读取 Linux 系统中的文件就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。 添加如下内容 # Name the components on this agent a2.sources r2 a2.sinks k2 a2.channels c2 # Describe/configure the source a2.sources.r2.type exec a2.sources.r2.command tail -F /opt/module/hive/logs/hive.log # Describe the sink a2.sinks.k2.type hdfs a2.sinks.k2.hdfs.path hdfs:// hadoop102:9820 /flume/%Y%m%d/%H # 上传文件的前缀 a2.sinks.k2.hdfs.filePrefix logs- # 是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round true # 多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue 1 # 重新定义时间单位 a2.sinks.k2.hdfs.roundUnit hour # 是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp true # 积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize 100 # 设置文件类型可支持压缩 a2.sinks.k2.hdfs.fileType DataStream # 多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval 60 # 设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize 134217700 # 文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount 0 # Use a channel which buffers events in memory a2.channels.c2.type memory a2.channels.c2.capacity 1000 a2.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r2.channels c2 a2.sinks.k2.channel c2 注意 对于所有与时间相关的转义序列Event Header 中必须存在以 “timestamp”的 key除非 hdfs.useLocalTimeStamp 设置为 true此方法会使用 TimestampInterceptor 自 动添加 timestamp。 a3.sinks.k3.hdfs.useLocalTimeStamp true  ————————————————————————————— 实时读取本地文件到 HDFS 案例 # Name the components on this agent a2.sources r2 a2.sinks k2 a2.channels c2 # Describe/configure the source a2.sources.r2.type exec a2.sources.r2.command tail -F /opt/module/hive/logs/hive.log a2.sources.r2.shell /bin/bash -c # Describe the sink a2.sinks.k2.type hdfs a2.sinks.k2.hdfs.path hdfs://hadoop102:9000/flume/%Y%m%d/%H a2.sinks.k2.hdfs.filePrefix logs a2.sinks.k2.hdfs.round true a2.sinks.k2.hdfs.roundValue 1 a2.sinks.k2.hdfs.roundUnit hour a2.sinks.k2.hdfs.useLocalTimeStamp true a2.sinks.k2.hdfs.batchSize 1000 a2.sinks.k2.hdfs.fileType DataStream a2.sinks.k2.hdfs.rollInterval 60 a2.sinks.k2.hdfs.rollSize 134217700 a2.sinks.k2.hdfs.rollCount 0 # Use a channel which buffers events in memory a2.channels.c2.type memory a2.channels.c2.capacity 1000 a2.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r2.channels c2 a2.sinks.k2.channel c2 #上传文件的前缀 #是否按照时间滚动文件夹 #多少时间单位创建一个新的文件夹 #重新定义时间单位 #是否使用本地时间戳 #积攒多少个 Event 才 flush 到 HDFS 一次 #设置文件类型可支持压缩 #多久生成一个新的文件 #设置每个文件的滚动大小 #文件的滚动与 Event 数量无关 #定义source #定义sink #定义channel #定义source类型为exec可执行命令的 #执行shell脚本的绝对路径 3运行 Flume [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf 4开启 Hadoop 和 Hive 并操作 Hive 产生日志 [atguiguhadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh [atguiguhadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh [atguiguhadoop102 hive]$ bin/hive hive (default) 5在 HDFS 上查看文件。 2.2.3 实时监控目录下多个新文件 1案例需求使用 Flume 监听整个目录的文件并上传至 HDFS 2需求分析 ————————————————————————————— 实时读取目录文件到 HDFS 案例 被监控的目录 /opt/module/flume/ upload Flume 监控目录 HDFS 待上传的文件 Spooldir Source HDFS Sink Memory Channel 1 创建符合条件的 flume 配置文件 2 执行配置文件开启监控 3 向 upload 目 录中添加文件 4 查看 HDFS 上数据 5 查看 /opt/module/flume/upload 目录中上传的文件是否已经标记 为 .COMPLETED 结尾 .tmp 后缀 结尾文件没有上传。 3 3实现步骤 1创建配置文件 flume-dir-hdfs.conf 创建一个文件 [atguiguhadoop102 job]$ vim flume-dir-hdfs.conf 添加如下内容 a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a3.sources.r3.type spooldir a3.sources.r3.spoolDir /opt/module/flume/upload a3.sources.r3.fileSuffix .COMPLETED a3.sources.r3.fileHeader true # 忽略所有以 .tmp 结尾的文件不上传 a3.sources.r3.ignorePattern ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs:// hadoop102:9820 /flume/upload/%Y%m%d/%H # 上传文件的前缀 a3.sinks.k3.hdfs.filePrefix upload- # 是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round true # 多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue 1 # 重新定义时间单位 a3.sinks.k3.hdfs.roundUnit hour # 是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp true # 积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize 100  ————————————————————————————— # 设置文件类型可支持压缩 a3.sinks.k3.hdfs.fileType DataStream # 多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval 60 # 设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize 134217700 # 文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3 实时读取目录文件到 HDFS 案例 a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a3.sources.r3.type spooldir a3.sources.r3.spoolDir /opt/module/flume/upload a3.sources.r3.fileSuffix .COMPLETED a3.sources.r3.fileHeader true a3.sources.r3.ignorePattern ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs://hadoop102:9000/flume/upload/%Y%m%d/% H a3.sinks.k3.hdfs.filePrefix upload a3.sinks.k3.hdfs.round true a3.sinks.k3.hdfs.roundValue 1 a3.sinks.k3.hdfs.roundUnit hour a3.sinks.k3.hdfs.useLocalTimeStamp true a3.sinks.k3.hdfs.batchSize 100 a3.sinks.k3.hdfs.fileType DataStream a3.sinks.k3.hdfs.rollInterval 60 a3.sinks.k3.hdfs.rollSize 134217700 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3 #定义source #定义sink #定义channel #定义source类型为目录 #定义监控目录 #定义文件上传完后缀 #是否有文件头 #忽略所有以.tmp结尾的文件不上传 #sink类型为hdfs #文件上传到hdfs的路径 #上传文件到hdfs的前缀 #是否按时间滚动文件 #多少时间单位创建一个新的文件夹 #重新定义时间单位 #是否使用本地时间戳 #积攒多少个Event才flush到HDFS一次 #设置文件类型可支持压缩 #多久生成新文件 #多大生成新文件 #多少event生成新文件 2启动监控文件夹命令 [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf 说明 在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文 件上传完成的文件会以.COMPLETED 结尾被监控文件夹每 500 毫秒扫描一次文件变动。 3向 upload 文件夹中添加文件 在/opt/module/flume 目录下创建 upload 文件夹 [atguiguhadoop102 flume]$ mkdir upload 向 upload 文件夹中添加文件 [atguiguhadoop102 upload]$ touch atguigu.txt [atguiguhadoop102 upload]$ touch atguigu.tmp [atguiguhadoop102 upload]$ touch atguigu.log 4查看 HDFS 上的数据  ————————————————————————————— 2.2.4 实时监控目录下的多个追加文件 Exec source 适用于监控一个实时追加的文件不能实现断点续传Spooldir Source 适合用于同步新文件但不适合对实时追加日志的文件进行监听并同步而 Taildir Source 适合用于监听多个实时追加的文件并且能够实现断点续传。 1案例需求:使用 Flume 监听整个目录的实时追加文件并上传至 HDFS 2需求分析: 实时读取目录文件到 HDFS 案例 被监控的目录 /opt/module/flume/ files Flume 监控目录 HDFS Taildir Source HDFS Sink Memory Channel 1 创建符合条件的 flume 配置文件 2 执行配置文件开启监控 4 查看 HDFS 上数据 3. 向监控文件追加内容 echo hello files/file1.txt echo hello files/file2.txt 3 3实现步骤 1创建配置文件 flume-taildir-hdfs.conf 创建一个文件 [atguiguhadoop102 job]$ vim flume-taildir-hdfs.conf 添加如下内容 a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a3.sources.r3.type TAILDIR a3.sources.r3.positionFile /opt/module/flume/tail_dir.json a3.sources.r3.filegroups f1 f2 a3.sources.r3.filegroups.f1 /opt/module/flume/files/.*file.* a3.sources.r3.filegroups.f2 /opt/module/flume/files2/.*log.* # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs:// hadoop102:9820 /flume/upload2/%Y%m%d/%H # 上传文件的前缀 a3.sinks.k3.hdfs.filePrefix upload-  ————————————————————————————— # 是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round true # 多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue 1 # 重新定义时间单位 a3.sinks.k3.hdfs.roundUnit hour # 是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp true # 积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize 100 # 设置文件类型可支持压缩 a3.sinks.k3.hdfs.fileType DataStream # 多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval 60 # 设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize 134217700 # 文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3 实时读取目录文件到 HDFS 案例 a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a2.sources.r2.type TAILDIR a2.sources.r2.positionFile /opt/module/flume/tail_dir.json a2.sources.r2.filegroups f1 a2.sources.r2.filegroups.f1 /opt/module/flume/files/file* # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix upload a3.sinks.k3.hdfs.round true a3.sinks.k3.hdfs.roundValue 1 a3.sinks.k3.hdfs.roundUnit hour a3.sinks.k3.hdfs.useLocalTimeStamp true a3.sinks.k3.hdfs.batchSize 100 a3.sinks.k3.hdfs.fileType DataStream a3.sinks.k3.hdfs.rollInterval 60 a3.sinks.k3.hdfs.rollSize 134217700 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3 #定义source #定义sink #定义channel #定义source类型 #定义监控目录文件 #指定position_file位置 #sink类型为hdfs #文件上传到hdfs的路径 #上传文件到hdfs的前缀 #是否按时间滚动文件 #多少时间单位创建一个新的文件夹 #重新定义时间单位 #是否使用本地时间戳 #积攒多少个Event才flush到HDFS一次 #设置文件类型可支持压缩 #多久生成新文件 #多大生成新文件 #多少event生成新文件 2启动监控文件夹命令 [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf 3向 files 文件夹中追加内容 在/opt/module/flume 目录下创建 files 文件夹 [atguiguhadoop102 flume]$ mkdir files  ————————————————————————————— 向 upload 文件夹中添加文件 [atguiguhadoop102 files]$ echo hello file1.txt [atguiguhadoop102 files]$ echo atguigu file2.txt 4查看 HDFS 上的数据 Taildir 说明 Taildir Source 维护了一个 json 格式的 position File其会定期的往 position File 中更新每个文件读取到的最新的位置因此能够实现断点续传。 Position File 的格式如下 {inode:2496272,pos:12,file:/opt/module/flume/files/file1.t xt} {inode:2496275,pos:12,file:/opt/module/flume/files/file2.t xt} 注 Linux 中储存文件元数据的区域就叫做 inode 每个 inode 都有一个号码操作系统 用 inode 号码来识别不同的文件 Unix/Linux 系统内部不使用文件名而使用 inode 号码来 识别文件。 第 3 章 Flume 进阶 3.1 Flume 事务 Flume 事务 Web Server Source Sink Eventn HDFS Agent 数据输入端 Flume 流式处理 数据输出端 Channel Event2 Event1 … 接收事件 推送事件 转发事件 doPut putList Transaction doCommit doRollback batch data takeList doCommit doTake batch data doRollback Transaction •doPut: 将批数据先写入临时缓冲区 putList Put 事务流程 •doCommit: 检查 channel 内存队列是否足够合并。 •doRollback:channel 内存队列空间不足回滚数据 Take 事务 •doTake: 将数据取到临时缓冲区 takeList 并将数据发送到 HDFS •doCommit: 如果数据全部发送成功则清除临时缓冲区 takeList •doRollback: 数据发送过程中如果出现异常 rollback 将临时缓冲区 t akeList 中的数据归还给 channel 内存队列。 拉取事件  ————————————————————————————— 3.2 Flume Agent 内部原理 Flume Agent 内部原理 Source Channel Processor Channel Selector Interceptor Interceptor Interceptor Channel1 Channel2 Channel3 SinkProcessor Sink1 Sink2 Sink3 1 接收数据 2 处理事件 3 将事件传递给拦截器链 4 将每个事件给 Channel 选择器 5 返回写入事件 Channel 列表 6 根据 Channel 选择 器的选择结果将事 件写入相应 Channel 。 7 SinkProcessor 有三种 DefaultSinkProcessor 、 LoadBalancingSinkProcessor 、 FailoverSinkProcessor 每种都有其各自的功能 Channel Selectors 有 两 种 类 型 :Replicating Channel Selector (default) 和 Multiplexing Channel Selector 。 Replicating 会 将 source 过来的 events 发往所 有 channel, 而 Multiplexing 可 以配置发往哪些 Channel 。 重要组件 1ChannelSelector ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型 分别是 Replicating 复制和 Multiplexing 多路复用。 ReplicatingSelector 会将同一个 Event 发往所有的 ChannelMultiplexing 会根据相 应的原则将不同的 Event 发往不同的 Channel。 2SinkProcessor SinkProcessor 共 有 三 种 类 型 分 别 是 DefaultSinkProcessor 、 LoadBalancingSinkProcessor 和 FailoverSinkProcessor DefaultSinkProcessor 对 应 的 是 单 个 的 Sink LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink GroupLoadBalancingSinkProcessor 可以实现负 载均衡的功能FailoverSinkProcessor 可以错误恢复的功能。 ————————————————————————————— 3.3 Flume 拓扑结构 3.3.1 简单串联 图 Flume Agent 连接 这种模式是将多个 flume 顺序连接起来了从最初的 source 开始到最终 sink 传送的 目的存储系统。此模式不建议桥接过多的 flume 数量 flume 数量过多不仅会影响传输速 率而且一旦传输过程中某个节点 flume 宕机会影响整个传输系统。 3.3.2 复制和多路复用 图 单 source多 channel、sink Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中或者将不同数据分发到不同的 channel 中sink 可以选择传送到不同的目的 地。  ————————————————————————————— 3.3.3 负载均衡和故障转移 图 Flume 负载均衡或故障转移 Flume支持使用将多个sink逻辑上分到一个sink组sink组配合不同的SinkProcessor 可以实现负载均衡和错误恢复的功能。 3.3.4 聚合 图 Flume Agent 聚合 这种模式是我们最常见的也非常实用日常 web 应用通常分布在上百个服务器大者 甚至上千个、上万个服务器。产生的日志处理起来也非常麻烦。用 flume 的这种组合方式 能很好的解决这一问题每台服务器部署一个 flume 采集日志传送到一个集中收集日志的  ————————————————————————————— flume再由此 flume 上传到 hdfs、hive、hbase 等进行日志分析。 3.4 Flume 企业开发案例 3.4.1 复制和多路复用 1案例需求 使用 Flume-1 监控文件变动Flume-1 将变动内容传递给 Flume-2Flume-2 负责存储 到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3Flume-3 负责输出到 Local FileSystem。 2需求分析 单数据源多出口案例选择器 Hive 日志文件 /opt/module/hive/logs / hive.log Flume-file-flume 监控文件 HDFS Hive 实时更新日志 Exec Source Avro Sink1 Memory Channel1 Memory Channel2 flume-flume-hdfs Avro Source HDFS Sink Memory Channel flume-flume-dir Avro Source File_roll Sink Memory Channel Avro Sink2 本地目录 /opt/module/datas/flume3 Flume-1 Flume-2 Flume-3 Replicating ChannelSelector 3实现步骤 1准备工作 在/opt/module/flume/job 目录下创建 group1 文件夹 [atguiguhadoop102 job]$ cd group1/ 在/opt/module/datas/目录下创建 flume3 文件夹 [atguiguhadoop102 datas]$ mkdir flume3 2创建 flume-file-flume.conf 配置 1 个接收日志文件的 source 和两个 channel、两个 sink分别输送给 flume-flume hdfs 和 flume-flume-dir。 编辑配置文件 [atguiguhadoop102 group1]$ vim flume-file-flume.conf 添加如下内容  ————————————————————————————— # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # 将数据流复制给所有 channel a1.sources.r1.selector.type replicating # Describe/configure the source a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell /bin/bash -c # Describe the sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop102 a1.sinks.k1.port 4141 a1.sinks.k2.type avro a1.sinks.k2.hostname hadoop102 a1.sinks.k2.port 4142 # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 3创建 flume-flume-hdfs.conf 配置上级 Flume 输出的 Source输出是到 HDFS 的 Sink。 编辑配置文件 [atguiguhadoop102 group1]$ vim flume-flume-hdfs.conf 添加如下内容 # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source # source 端的 avro 是一个数据接收服务 a2.sources.r1.type avro a2.sources.r1.bind hadoop102 a2.sources.r1.port 4141 # Describe the sink a2.sinks.k1.type hdfs a2.sinks.k1.hdfs.path hdfs:// hadoop102:9820 /flume2/%Y%m%d/%H  ————————————————————————————— # 上传文件的前缀 a2.sinks.k1.hdfs.filePrefix flume2- # 是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round true # 多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue 1 # 重新定义时间单位 a2.sinks.k1.hdfs.roundUnit hour # 是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp true # 积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize 100 # 设置文件类型可支持压缩 a2.sinks.k1.hdfs.fileType DataStream # 多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval 30 # 设置每个文件的滚动大小大概是 128M a2.sinks.k1.hdfs.rollSize 134217700 # 文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount 0 # Describe the channel a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1 4创建 flume-flume-dir.conf 配置上级 Flume 输出的 Source输出是到本地目录的 Sink。 编辑配置文件 [atguiguhadoop102 group1]$ vim flume-flume-dir.conf 添加如下内容 # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c2 # Describe/configure the source a3.sources.r1.type avro a3.sources.r1.bind hadoop102 a3.sources.r1.port 4142 # Describe the sink a3.sinks.k1.type file_roll a3.sinks.k1.sink.directory /opt/module/data/flume3 # Describe the channel a3.channels.c2.type memory a3.channels.c2.capacity 1000 a3.channels.c2.transactionCapacity 100  ————————————————————————————— # Bind the source and sink to the channel a3.sources.r1.channels c2 a3.sinks.k1.channel c2 提示 输出的本地目录必须是已经存在的目录如果该目录不存在并不会创建新的目 录。 5执行配置文件 分别启动对应的 flume 进程flume-flume-dirflume-flume-hdfsflume-file-flume。 [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf 6启动 Hadoop 和 Hive [atguiguhadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh [atguiguhadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh [atguiguhadoop102 hive]$ bin/hive hive (default) 7检查 HDFS 上数据 8检查/opt/module/datas/flume3 目录中数据 [atguiguhadoop102 flume3]$ ll 总用量 8 -rw-rw-r--. 1 atguigu atguigu 5942 5 月 22 00:09 1526918887550-3 3.4.2 负载均衡和故障转移 1案例需求 使用 Flume1 监控一个端口其 sink 组中的 sink 分别对接 Flume2 和 Flume3采用 FailoverSinkProcessor实现故障转移的功能。 2需求分析  ————————————————————————————— 故障转移案例 Hello Atguigu Hive flume-file-flume 监控文件 控 制 台 nc localhost 44444 netcat Source Avro Sink1 Memory Channel flume-flume-console1 Avro Source logger Sink Memory Channel flume-flume-console2 Avro Source logger Sink Memory Channel Avro Sink2 Flume-1 Flume-2 Flume-3 FailoverSinkProcessor 3实现步骤 1准备工作 在/opt/module/flume/job 目录下创建 group2 文件夹 [atguiguhadoop102 job]$ cd group2/ 2创建 flume-netcat-flume.conf 配置 1 个 netcat source 和 1 个 channel、1 个 sink group2 个 sink分别输送给 flume-flume-console1 和 flume-flume-console2。 编辑配置文件 [atguiguhadoop102 group2]$ vim flume-netcat-flume.conf 添加如下内容 # Name the components on this agent a1.sources r1 a1.channels c1 a1.sinkgroups g1 a1.sinks k1 k2 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sinkgroups.g1.processor.type failover a1.sinkgroups.g1.processor.priority.k1 5 a1.sinkgroups.g1.processor.priority.k2 10 a1.sinkgroups.g1.processor.maxpenalty 10000 # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop102 a1.sinks.k1.port 4141  ————————————————————————————— a1.sinks.k2.type avro a1.sinks.k2.hostname hadoop102 a1.sinks.k2.port 4142 # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinkgroups.g1.sinks k1 k2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c1 3创建 flume-flume-console1.conf 配置上级 Flume 输出的 Source输出是到本地控制台。 编辑配置文件 [atguiguhadoop102 group2]$ vim flume-flume-console1.conf 添加如下内容 # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source a2.sources.r1.type avro a2.sources.r1.bind hadoop102 a2.sources.r1.port 4141 # Describe the sink a2.sinks.k1.type logger # Describe the channel a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1 4创建 flume-flume-console2.conf 配置上级 Flume 输出的 Source输出是到本地控制台。 编辑配置文件 [atguiguhadoop102 group2]$ vim flume-flume-console2.conf 添加如下内容 # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c2  ————————————————————————————— # Describe/configure the source a3.sources.r1.type avro a3.sources.r1.bind hadoop102 a3.sources.r1.port 4142 # Describe the sink a3.sinks.k1.type logger # Describe the channel a3.channels.c2.type memory a3.channels.c2.capacity 1000 a3.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r1.channels c2 a3.sinks.k1.channel c2 5执行配置文件 分别开启对应配置文件flume-flume-console2flume-flume-console1flume netcat-flume。 [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf - Dflume.root.loggerINFO,console [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf - Dflume.root.loggerINFO,console [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf 6使用 netcat 工具向本机的 44444 端口发送内容 $ nc localhost 44444 7查看 Flume2 及 Flume3 的控制台打印日志 8将 Flume2 kill观察 Flume3 的控制台打印情况。 注使用 jps -ml 查看 Flume 进程。 3.4.3 聚合 1案例需求 hadoop102 上的 Flume-1 监控文件/opt/module/group.log hadoop103 上的 Flume-2 监控某一个端口的数据流 Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3Flume-3 将最终数据打印 到控制台。 2需求分析  ————————————————————————————— 多数据源汇总案例 日志文件 /opt/module/ group.log Flume1 配置文件 控制台 实时更新日志 Exec Source Avro Sink Memory Channel Flume3 配置文件 Avro Source logger Sink Memory Channel 主机 44444 端口 Flume2 配置文件 Netcat Source Avro Sink Memory Channel 通过 telnet 向 44444 端 口发送数据 Flume-1 Flume-2 Flume-3 3实现步骤 1准备工作 分发 Flume [atguiguhadoop102 module]$ xsync flume 在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个 group3 文件夹。 [atguiguhadoop102 job]$ mkdir group3 [atguiguhadoop103 job]$ mkdir group3 [atguiguhadoop104 job]$ mkdir group3 2创建 flume1-logger-flume.conf 配置 Source 用于监控 hive.log 文件配置 Sink 输出数据到下一级 Flume。 在 hadoop102 上编辑配置文件 [atguiguhadoop102 group3]$ vim flume1-logger-flume.conf 添加如下内容 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/group.log a1.sources.r1.shell /bin/bash -c # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop104 a1.sinks.k1.port 4141  # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 3创建 flume2-netcat-flume.conf 配置 Source 监控端口 44444 数据流配置 Sink 数据到下一级 Flume 在 hadoop103 上编辑配置文件 [atguiguhadoop102 group3]$ vim flume2-netcat-flume.conf 添加如下内容 # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source a2.sources.r1.type netcat a2.sources.r1.bind hadoop103 a2.sources.r1.port 44444 # Describe the sink a2.sinks.k1.type avro a2.sinks.k1.hostname hadoop104 a2.sinks.k1.port 41414 # Use a channel which buffers events in memory a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1 4创建 flume3-flume-logger.conf 配置 source 用于接收 flume1 与 flume2 发送过来的数据流最终合并后 sink 到控制 台。 在 hadoop104 上编辑配置文件 [atguiguhadoop104 group3]$ touch flume3-flume-logger.conf [atguiguhadoop104 group3]$ vim flume3-flume-logger.conf 添加如下内容 # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c1 # Describe/configure the source a3.sources.r1.type avro  ————————————————————————————— a3.sources.r1.bind hadoop104 a3.sources.r1.port 4141 # Describe the sink # Describe the sink a3.sinks.k1.type logger # Describe the channel a3.channels.c1.type memory a3.channels.c1.capacity 1000 a3.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r1.channels c1 a3.sinks.k1.channel c1 5执行配置文件 分别开启对应配置文件flume3-flume-logger.confflume2-netcat-flume.conf flume1-logger-flume.conf。 [atguiguhadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf - Dflume.root.loggerINFO,console [atguiguhadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf [atguiguhadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf 6在 hadoop103 上向/opt/module 目录下的 group.log 追加内容 [atguiguhadoop103 module]$ echo hello group.log 7在 hadoop102 上向 44444 端口发送数据 [atguiguhadoop102 flume]$ telnet hadoop102 44444 8检查 hadoop104 上数据 3.5 自定义 Interceptor 1案例需求 使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不 同的分析系统。 2需求分析 在实际的开发中一台服务器产生的日志类型可能有很多种不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构Multiplexing 的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channe ————————————————————————————— 中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予 不同的值。 在该案例中我们以端口数据模拟日志以是否包含”atguigu”模拟不同类型的日志 我们需要自定义 interceptor 区分数据中是否包含”atguigu”将其分别发往不同的分析 系统Channel。 Interceptor 和 Multiplexing ChannelSelector 案例 atguigu flume 控 制 台 控制台实时输入数据 netcat Source Avro Sink1 Memory Channel flume-flume-console1 Avro Source logger Sink Memory Channel flume-flume-console2 Avro Source logger Sink Memory Channel Avro Sink2 Flume-1 Flume-2 Flume-3 控 制 台 atguigu flume Memory Channel Multiplexing Channel Selector 3实现步骤 1创建一个 maven 项目并引入以下依赖。 dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-core/artifactId version1.9.0/version /dependency 2定义 CustomInterceptor 类并实现 Interceptor 接口。 package com.atguigu.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; public class TypeInterceptor implements Interceptor { // 声明一个存放事件的集合 private ListEvent addHeaderEvents; Override ———————— public void initialize() { // 初始化存放事件的集合 addHeaderEvents new ArrayList(); } // 单个事件拦截 Override public Event intercept(Event event) { //1. 获取事件中的头信息 MapString, String headers event.getHeaders(); //2. 获取事件中的 body 信息 String body new String(event.getBody()); //3. 根据 body 中是否有 atguigu 来决定添加怎样的头信息 if (body.contains(atguigu)) { //4. 添加头信息 headers.put(type, first); } else { //4. 添加头信息 headers.put(type, second); } return event; } // 批量事件拦截 Override public ListEvent intercept(ListEvent events) { //1. 清空集合 addHeaderEvents.clear(); //2. 遍历 events for (Event event : events) { //3. 给每一个事件添加头信息 addHeaderEvents.add(intercept(event)); } //4. 返回结果 return addHeaderEvents; } Override public void close() { } public static class Builder implements Interceptor.Builder { Override public Interceptor build() { return new TypeInterceptor(); } Override  ————————————————————————————— public void configure(Context context) { } } } 3编辑 flume 配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source 1 个 sink group2 个 avro sink 并配置相应的 ChannelSelector 和 interceptor。 # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.flume.interceptor.CustomInterceptor$Builder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2 # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242 # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。 a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro ————————————————————————————— a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。 a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 4分别在 hadoop102hadoop103hadoop104 上启动 flume 进程注意先后顺序。 5在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。 6观察 hadoop103 和 hadoop104 打印的日志。 3.6 自定义 Source 1介绍 Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种 格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多但是有时候并不能 满足实际开发当中的需求此时我们就需要根据实际需求自定义某些 source。 官方也提供了自定义 source 的接口 https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。 实现相应方法 getBackOffSleepIncrement() //backoff 步长  ————————————————————————————— getMaxBackOffSleepInterval()//backoff 最长时间 configure(Context context)//初始化 context读取配置文件内容 process()//获取数据封装成 event 并写入 channel这个方法将被循环调用。 使用场景读取 MySQL 数据或者其他文件系统。 2需求 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文 件中配置。 自定义 Source 需求 Console MySource 代码中 循环生产数据 Logger Sink Memory Channel 2 创建符合条件的 flume 配置文件 3 执行配置文件开启监控 4 查看控制 台数据 1 编写自定义 Source 代码 并打包到集群 3分析 自定义 Source 需求分析 AbstractSource Configurable PollableSource configure(Context context)读取配置文件(XX.conf)中的配置信息 process()接收数据将数据封装成一个个的Event写入Channel。使用for循环模拟数据生成。for(int i0;i5;i) getBackOffSleepIncrement()暂不用 getMaxBackOffSleepInterval()暂不用 MySource  ————————————————————————————— 4编码 1导入 pom 依赖 dependencies dependency groupIdorg.apache.flume/groupId artifactIdflume-ng-core/artifactId version1.9.0/version /dependency 2编写代码 package com.atguigu; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义配置文件将来要读取的字段 private Long delay; private String field; // 初始化配置信息 Override public void configure(Context context) { delay context.getLong(delay); field context.getString(field, Hello!); } Override public Status process() throws EventDeliveryException { try { // 创建事件头信息 HashMapString, String hearderMap new HashMap(); // 创建事件 SimpleEvent event new SimpleEvent(); // 循环封装事件 for (int i 0; i 5; i) { // 给事件设置头信息 event.setHeaders(hearderMap); // 给事件设置内容 event.setBody((field i).getBytes()); // 将事件写入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace();  ————————————————————————————— return Status.BACKOFF; } return Status.READY; } Override public long getBackOffSleepIncrement() { return 0; } Override public long getMaxBackOffSleepInterval() { return 0; } } 5测试 1打包 将写好的代码打包并放到 flume 的 lib 目录/opt/module/flume下。 2配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type com.atguigu.MySource a1.sources.r1.delay 1000 #a1.sources.r1.field atguigu # Describe the sink a1.sinks.k1.type logger # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 3开启任务 [atguiguhadoop102 flume]$ pwd /opt/module/flume [atguiguhadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.loggerINFO,console 4结果展示 ————————————————————————————— 3.7 自定义 Sink 1介绍 Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储 或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一 个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提 交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件。 Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定义。官方提供的 Sink 类型已经很多但是有时候并不能满足实际开发当中的需求此 时我们就需要根据实际需求自定义某些 Sink。 官方也提供了自定义 sink 的接口 https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。 实现相应方法 configure(Context context)//初始化 context读取配置文件内容 process()//从 Channel 读取获取数据event这个方法将被循环调用。 使用场景读取 Channel 数据写入 MySQL 或者其他文件系统。 2需求 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后 缀可在 flume 任务配置文件中配置。 流程分析 ————————————————————————————— MySink process(): 从 Channel 中取数据添加前后缀写入日志。 hello:atguigu:hello atguigu 控制台输入 atguigu 3. 数据流 source channel sink 1. 编码 AbstractSink 2. 打包到集群并编写任务配置文件 Configurable configure(): 读取任务配置文件中的配置信息。 3编码 package com.atguigu; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { // 创建 Logger 对象 private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; Override public Status process() throws EventDeliveryException { // 声明返回值状态信息 Status status; // 获取当前 Sink 绑定的 Channel Channel ch getChannel(); // 获取事务 Transaction txn ch.getTransaction(); // 声明事件 Event event; // 开启事务 txn.begin();  ————————————————————————————— // 读取 Channel 中的事件直到读取到事件结束循环 while (true) { event ch.take(); if (event ! null) { break; } } try { // 处理事件打印 LOG.info(prefix new String(event.getBody()) suffix); // 事务提交 txn.commit(); status Status.READY; } catch (Exception e) { // 遇到异常事务回滚 txn.rollback(); status Status.BACKOFF; } finally { // 关闭事务 txn.close(); } return status; } Override public void configure(Context context) { // 读取配置文件内容有默认值 prefix context.getString(prefix, hello:); // 读取配置文件内容无默认值 suffix context.getString(suffix); } } 4测试 1打包 将写好的代码打包并放到 flume 的 lib 目录/opt/module/flume下。 2配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444  ————————————————————————————— # Describe the sink a1.sinks.k1.type com.atguigu.MySink #a1.sinks.k1.prefix atguigu: a1.sinks.k1.suffix :atguigu # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 3开启任务 [atguiguhadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.loggerINFO,console [atguiguhadoop102 ~]$ nc localhost 44444 hello OK atguigu OK 4结果展示 3.8 Flume 数据流监控 3.8.1 Ganglia 的安装与部署 Ganglia 由 gmond、gmetad 和 gweb 三部分组成。 gmondGanglia Monitoring Daemon是一种轻量级服务安装在每台需要收集指标数 据的节点主机上。使用 gmond你可以很容易收集很多系统指标数据如 CPU、内存、磁盘、 网络和活跃进程的数据等。 gmetadGanglia Meta Daemon整合所有信息并将其以 RRD 格式存储至磁盘的服务。 gwebGanglia WebGanglia 可视化工具gweb 是一种利用浏览器显示 gmetad 所存储 数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数 据。 1安装 ganglia 1规划 hadoop102: web gmetad gmod hadoop103: gmod hadoop104: gmod 2在 102 103 104 分别安装 epel-release  ————————————————————————————— [atguiguhadoop102 flume]$ sudo yum -y install epel-release 3在 102 安装 [atguiguhadoop102 flume]$ sudo yum -y install ganglia-gmetad [atguiguhadoop102 flume]$ sudo yum -y install ganglia-web [atguiguhadoop102 flume]$ sudo yum -y install ganglia-gmond 4在 103 和 104 安装 [atguiguhadoop102 flume]$ sudo yum -y install ganglia-gmond 2在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf [atguiguhadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf 修改为红颜色的配置 # Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia Location /ganglia # Require local # 通过 windows 访问 ganglia, 需要配置 Linux 对应的主机 (windows)ip 地址 Require ip 192.168.9.1 # Require ip 10.1.2.3 # Require host example.org /Location 5在 102 修改配置文件/etc/ganglia/gmetad.conf [atguiguhadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf 修改为 data_source my cluster hadoop102 6在 102 103 104 修改配置文件/etc/ganglia/gmond.conf [atguiguhadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf 修改为 cluster { name my cluster owner unspecified latlong unspecified url unspecified } udp_send_channel { #bind_hostname yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machines hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs. # mcast_join 239.2.11.71 # 数据发送给 hadoop102 host hadoop102 port 8649 ttl 1 }  ————————————————————————————— udp_recv_channel { # mcast_join 239.2.11.71 port 8649 # 接收来自任意连接的数据 bind 0.0.0.0 retry_bind true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer 10485760 } 7在 102 修改配置文件/etc/selinux/config [atguiguhadoop102 flume]$ sudo vim /etc/selinux/config 修改为 # This file controls the state of SELinux on the system. # SELINUX can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUXdisabled # SELINUXTYPE can take one of these two values: # targeted - Targeted processes are protected, # mls - Multi Level Security protection. SELINUXTYPEtargeted 尖叫提示 selinux 生效需要重启如果此时不想重启可以临时生效之 [atguiguhadoop102 flume]$ sudo setenforce 0 8启动 ganglia 1在 102 103 104 启动 [atguiguhadoop102 flume]$ sudo systemctl start gmond 2在 102 启动 [atguiguhadoop102 flume]$ sudo systemctl start httpd [atguiguhadoop102 flume]$ sudo systemctl start gmetad 9打开网页浏览 ganglia 页面 http://hadoop102/ganglia 尖叫提示 如果完成以上操作依然出现权限不足错误请修改/var/lib/ganglia 目录 的权限 [atguiguhadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia 3.8.2 操作 Flume 测试监控 1启动 Flume 任务 [atguiguhadoop102 flume]$ bin/flume-ng agent \ -c conf/ \ -n a1 \ -f job/flume-netcat-logger.conf \ -Dflume.root.loggerINFO,console \ -Dflume.monitoring.typeganglia \ -Dflume.monitoring.hostshadoop102:8649  ————————————————————————————— 2发送数据观察 ganglia 监测图 [atguiguhadoop102 flume]$ nc localhost 44444 样式如图 图例说明 字段图表名称 字段含义 EventPutAttemptCount source 尝试写入 channel 的事件总数量 EventPutSuccessCount 成功写入 channel 且提交的事件总数量 EventTakeAttemptCount sink 尝试从 channel 拉取事件的总数量。 EventTakeSuccessCount sink 成功读取的事件的总数量 StartTime channel 启动的时间毫秒 StopTime channel 停止的时间毫秒 ChannelSize 目前 channel 中事件的总数量 ChannelFillPercentage channel 占用百分比 ChannelCapacity channel 的容量 第 4 章 企业真实面试题重点 4.1 你是如何实现 Flume 数据传输的监控的 使用第三方框架 Ganglia 实时监控 Flume。 4.2 Flume 的 Source Sink Channel 的作用你们 Source 是什么类 型 1作用 1Source 组件是专门用来收集数据的可以处理各种类型、各种格式的日志数据 包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、 http、legacy 2Channel 组件对采集到的数据进行缓存可以存放在 Memory 或 File 中。 3Sink 组件是用于把数据发送到目的地的组件目的地包括 Hdfs、Logger、avro、 thrift、ipc、file、Hbase、solr、自定义。 2我公司采用的 Source 类型为  ————————————————————————————— 1监控后台日志exec 2监控后台产生日志的端口netcat 4.3 Flume 的 Channel Selectors Flume Channel Selectors 数据源 1 source 数据源 2 channel1 channel2 Sink1(Hdfs) Sink2(Logger) Channel Selectors 可以让不同的项目日志通过不同的 Channel 到不同的 Sink 中去。 官方文档上 Channel Selectors 有两种类型 :Replicating Channel Selector (default) 和 Multiplexing Channel Selector 这两种 Selector 的区别是 :Replicating 会 将 source 过来的 events 发往所有 channel, 而 Multiplexing 可以选择该发往哪些 Channel 。 4.4 Flume 参数调优 1Source 增加 Source 个使用 Tair Dir Source 时可增加 FileGroups 个数可以增大 Source 的读取数据的能力。例如当某一个目录产生的文件过多时需要将这个文件目录拆分成多个 文件目录同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。 batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数适当调大这个参 数可以提高 Source 搬运 Event 到 Channel 时的性能。 2Channel type 选择 memory 时 Channel 的性能最好但是如果 Flume 进程意外挂掉可能会丢失 数据。type 选择 file 时 Channel 的容错性更好但是性能上会比 memory channel 差。 使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。 Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决 定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event 条数。 transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。 3Sink 增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行 ————————————————————————————— 过多的 Sink 会占用系统资源造成系统资源不必要的浪费。 batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数适当调大这个参数 可以提高 Sink 从 Channel 搬出 event 的性能。 4.5 Flume 的事务机制 Flume 的事务机制类似数据库的事务机制Flume 使用两个独立的事务分别负责从 Soucrce 到 Channel以及从 Channel 到 Sink 的事件传递。 比如 spooling directory source 为文件的每一行创建一个事件一旦事务中所有的 事件全部传递到 Channel 且提交成功那么 Soucrce 就将该文件标记为完成。 同理事务以类似的方式处理从 Channel 到 Sink 的传递过程如果因为某种原因使得 事件无法记录那么事务将会回滚。且所有的事件都会保持到 Channel 中等待重新传递。 4.6 Flume 采集数据会丢失吗 ? 根据 Flume 的架构原理Flume 是不可能丢失数据的其内部有完善的事务机制 Source 到 Channel 是事务性的Channel 到 Sink 是事务性的因此这两个环节不会出现数 据的丢失唯一可能丢失数据的情况是 Channel 采用 memoryChannelagent 宕机导致数据 丢失或者 Channel 存储数据已满导致 Source 不再写入未写入的数据丢失。 Flume 不会丢失数据但是有可能造成数据的重复例如数据已经成功由 Sink 发出 但是没有接收到响应Sink 会再次发送数据此时可能会导致数据的重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/84441.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何修改网站后台东莞市建设局网站6

PCL-PEG-DCL (ACUPA) 聚己内酯聚乙二醇PSMA 抑制剂 【中文名称】 聚己内酯聚乙二醇PSMA抑制剂DCL 【英文名称】 PCL-PEG-DCL (ACUPA) 【品 牌】 碳水科技(Tanshtech) 【纯 度】 95%以上 【保 存】 -20 【规 格】 50mg,100mg,500mg,…

怎么给自己制作一个网站百度seo优化规则

点击蓝字关注我们因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络,侵删开发环境如前面介绍的那样,C属于一种静态的编译型语言,所以,开发环境配置过程中就需要用到对应的编译器。C有…

网站推广的目标备案域名被拿去做违法

题目 输入4个整数 要求按照从小到大的顺序输出 书上的学习辅导答案 // 主要部分 int main(){int t,a,b,c,d;printf("请输入四个数:");scanf("%d,%d,%d,%d"

金堂县建设局网站湖北城市建设职业技术学院教务网站

写在前面 尽管 tf.keras 提供了很多的常用网络层类,但深度学习可以使用的网络层远远不止这些。科研工作者一般是自行实现了较为新颖的网络层,经过大量实验验证有效后,深度学习框架才会跟进,内置对这些网络层的支持。因此掌握自定…

机械网站建设中心天津市建设工程监理公司网站

发现一个神奇的情况: 清除和关闭的操作: 1. 2.右键 3.点击 4.清空 5.最后需要关闭 QQ输入法的进程

企业网站 asp.net乾县网站建设

509. 斐波那契数 public static int fib(int n) {// 找出最后一步// 定义损失函数 定义记忆化存储基本单元// 状态转移方程 f(n) f(n-2)f(n-1); n > 0// 边界 (递归过程中需要判断)// 初始化 (在未递归之前需要处理)// 返回答案if (n 0) {return 0;}if (n 1) {return 1;…

梧州网站建设制作突唯阿网站seo

quickx是对cocos2dx的lua扩展,它做了一些C的扩展,同时还在lua做了一些封装, 让用lua开发cocos2dx更快,中文站http://quick.cocoachina.com/。 由于现在的项目对cocos2dx有一些修改,又想用到quickx的便捷,于…

织梦cms可以做外贸网站吗大连甘井子区社区工作者招聘

GZ032 信息安全管理与评估赛题第1套 一、 单选题 (每题 2 分,共 35 题,共 70 分) 1、《中华人民共和国数据安全法》已由中华人民共和国第十三届全国人民代 表大会常务委员会第二十九次会议通过,现予公布,自…

阳信网站建设凡科模板建站

Kubernetes在数字化转型中的作用 数字化转型是指在现代化社会中,利用数字技术来改变企业、组织或个人的业务模式、流程和价值创造方式的过程。这包括使用数字技术来提高效率、创新产品或服务、优化客户体验和开发新的业务模式等方面。数字化转型是一个全球性的趋势…

万网网站建设步骤seo关键词排行优化教程

在 Angular 2 及以上版本与 C#结合使用 REST API 的示例中,我们将分别展示前端 Angular 服务和后端 C# Web API 的实现。 一、前端:Angular 服务 生成 Angular 服务 使用 Angular CLI 生成一个新的服务,例如user.service.ts: ng…

wordpress搭建网站教程企业门户网站建设现状

MyBatis 的前身就是 iBatis 。是一个数据持久层(ORM)框架。 iBATIS一词来源于“internet”和“abatis”的组合,是一个基于Java的持久层框架。iBATIS提供的持久层框架包括SQL Maps和Data Access Objects(DAO),同时还提供一个利用这…

安庆跨境电商建站哪家好.net flash网站模板

Linux提供了丰富的帮助手册,当你需要查看某个命令的参数时不必到处上网查找,只要man一下即可。可以使用man man 查看man的使用方法1.man共有以下几个章节代码功能1标准用户命令(Executable programs or shell commands)2系统调用(System calls)functions…

100个有趣的网站廊坊网站制作推广

.NET Core运行时已经看到了实现真正的跨平台的美好前景,它最终出现在Linux和Mac OS X平台上。在上周举办的Microsoft Build大会上,来自微软的项目经理Habib Heydarian为听众分析了这一举措对开发者们所带来的益处,并告诉开发者们如何开始探索…

公司网站工商备案怎么做做App和网站 聚马

撰写测试Bug提交说明时,清晰、详细和准确是至关重要的。这有助于开发团队快速理解问题、重现Bug并修复它。以下是一个测试Bug提交说明的模板,可以根据实际情况进行调整: 测试Bug提交说明 1. Bug基本信息 Bug编号:[系统自动生成…

淘宝网站建设可行性分析报告怎么做网站页面代码搜索

声明:本文原创首发于公众号夕小瑶的卖萌屋。作者:智商掉了一地--->【我是传送门】,内含海量CV/NLP/ML入门资料、必刷综述、前沿论文解读、交流社群、offer神器、学习神器等在介绍接下来的内容前,我们先来看一则笑话&#xff1a…

湖南做网站 磐石网络引领定制企业app开发

论文名称:Pegasus: Tolerating Skewed Workloads in Distributed Storage with In-Network Coherence Directories 摘要 高性能分布式存储系统面临着由于偏斜和动态工作负载引起的负载不平衡的挑战。本文介绍了Pegasus,这是一个利用新一代可编程交换机…

设计师网站库深圳最新动态实时更新

在 Java NIO(New I/O)中,Selector 是一种多路复用的机制,用于管理多个通道的 I/O 操作。通过使用 Selector,我们可以在一个线程中同时管理多个通道的读写操作,提高系统的效率和性能。 本篇博客将详细介绍 …

大气手机网站模板免费下载网站建设公司高端

文章目录 数据数据类型 数据分析过程数据采集数据采集源数据采集方法 数据清洗清洗数据数据集成数据转换数据脱敏 数据 《春秋左传集解》云:“事大大其绳,事小小其绳。”体现了早期人类将事情的“大小”这一性质抽象到“绳结大小”这一符号上从而产生数…

上海专业制作电子商务网站硬件开发专业

二、快速入门 2.1 打开IDEA,点击New一个项目 入口,依次打开 File -> New -> Project。 2.2 使用Spring Initializr方式构建Spring Boot项目 2.3 设置项目所属组、项目名称、java版本等 2.4 选择SpringBoot版本及依赖组件 点击Create进行创建。 2.6 创建成…