大数据课程E7——Flume的Interceptor

文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州

 ▲ 本章节目的

⚪ 了解Interceptor的概念和配置参数;

⚪ 掌握Interceptor的使用方法;

⚪ 掌握Interceptor的Host Interceptor;

⚪ 掌握Interceptor的Static Interceptor;

⚪ 掌握Interceptor的UUID Interceptor;

⚪ 掌握Interceptor的Search And Replace Interceptor;

⚪ 掌握Interceptor的Regex Filtering Interceptor;

⚪ 掌握Interceptor的Custom Interceptor;

一、Timestamp Interceptor

1. 概述

1. Timestamp Interceptor是在headers中来添加一个timestamp字段来标记数据被收集的时间。

2. Timestamp Interceptor结合HDFS Sink可以实现数据按天存储。

2. 配置属性

属性

解释

type

timestamp

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

4. 数据按天存放

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = hadoop01

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata/date=%Y-%m-%d

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 3600

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f hdfsin.conf -

Dflume.root.logger=INFO,console

二、Host Interceptor

1. 概述

1. Host Interceptor是在headers中添加一个字段host。

2. Host Interceptor可以用于标记数据来源于哪一台主机。

2. 配置属性

属性

解释

type

必须是host

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

三、Static Interceptor

1. 概述

1. Static Interceptor是在headers中添加指定字段。

2. 可以利用这个Interceptor来标记数据的类型。

2. 配置属性

属性

解释

type

必须是static

key

指定在headers中的字段名

value

指定在headers中的字段值

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2 i3

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

# 指定Static Interceptor

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = kind

a1.sources.s1.interceptors.i3.value = log

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

四、UUID Interceptor

1. 概述

1. UUID Interceptor是在headers中添加一个id字段。

2. 可以用于标记数据的唯一性。

2. 配置属性

属性

解释

type

必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 给Interceptor起名

a1.sources.s1.interceptors = i1 i2 i3 i4

# 指定Timestamp Interceptor

a1.sources.s1.interceptors.i1.type = timestamp

# 指定Host Interceptor

a1.sources.s1.interceptors.i2.type = host

# 指定Static Interceptor

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = kind

a1.sources.s1.interceptors.i3.value = log

# 指定UUID Interceptor

a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f in.conf -

Dflume.root.logger=INFO,console

五、Search And Replace Interceptor

1. 概述

1. Search And Replace Interceptor在使用的时候,需要指定正则表达式,会根据正则表达式的规则,将符合正则表达式的数据替换为指定形式的数据。

2. 在替换的时候,不会替换headers中的数据,而是会替换body中的数据。

2. 配置属性

属性

解释

type

必须是search_replace

searchPattern

指定要匹配的正则形式

replaceString

指定要替换的字符串

3. 案例

1. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = http

a1.sources.s1.port = 8090

# 给拦截器起名

a1.sources.s1.interceptors = i1

# 指定类型

a1.sources.s1.interceptors.i1.type = search_replace

a1.sources.s1.interceptors.i1.searchPattern = [0-9]

a1.sources.s1.interceptors.i1.replaceString = *

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f searchin.conf -

Dflume.root.logger=INFO,console

六、Regex Filtering Interceptor

1. 概述

1. Regex Filtering Interceptor在使用的时候需要指定正则表达式。

2. 属性excludeEvents的值如果不指定,默认是false。

3. 如果没有配置excludeEvents的值或者配置excludeEvents的值配置为false,则只有符合正则表达式的数据会留下来,其他不符合正则表达式的数据会被过滤掉;如果excludeEvents的值,那么符合正则表达式的数据会被过滤掉,其他的数据则会被留下来。

2. 配置属性

属性

解释

type

必须是regex_filter

regex

指定正则表达式

excludeEvents

true或者false

3. 案例

1. 编写格式文件,添加如下内容:

# 定义 数据源(输入端) 缓冲区 输出源(输出端)

a1.sources = r1

a1.channels = c1

a1.sinks = k1

# 输入端

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /opt/upload

a1.sources.r1.fileSuffix = .done

# 拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = regex_filter

#全部都是符合条件的数据

a1.sources.r1.interceptors.i1.regex = ^.*INFO.*$

#排除符合正则表达式的数据

# a1.sources.r1.interceptors.i1.excludeEvents = true

# 输出端

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://flume45:9000/interceptors/%Y%m%d/%H

#是否使用本地时间戳

a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 序列化

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 0

# 使用一个在内存中缓冲事件的通道

a1.channels.c1.type = memory

# 连接通道

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f regexin.conf -

Dflume.root.logger=INFO,console

七、Custom Interceptor

1. 概述

1. 在Flume中,也允许自定义拦截器。但是不同于其他组件,自定义Interceptor的时候,需要再额外覆盖其中的内部接口。

2. 步骤:

a. 构建Maven工程,导入对应的依赖。

b. 自定义一个类实现Interceptor接口,覆盖其中initialize,intercept和close方法。

c. 定义静态内部类,实现Interceptor.Builder内部接口。

d. 打成jar包方法Flume安装目录的lib目录下。

e. 编写格式文件,添加如下内容:

a1.sources = s1

a1.channels = c1

a1.sinks = k1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

# 指定拦截器

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = cn.tedu.flume.interceptor.AuthInterceptor$Builder

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

f. 启动Flume:

../bin/flume-ng agent -n a1 -c ../conf -f authin.conf -

Dflume.root.logger=INFO,console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/16541.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL中的函数

系列文章目录 MySQL常见的几种约束 文章目录 系列文章目录前言一、单行函数1.字符串函数 (String StringBuilder)2.数值函数 (Math)3.日期与时间函数4.流程函数( IF SWITCH)5.JSON函数6.其他函数 二、多行…

TreeMap的底层实现

0. 你需要知道的TreeMap的内置属性 0.1 节点属性 K key; // 键 V value; // 值 Entry<K,V> left; // 左子节点 Entry<K,V> right; // 右子节点 Entry<K,V> parent; // 父节点 boolean color; // 节点的颜色0.2 成员变量 //比较器对象private f…

rsync下行同步+inotify实时同步部署

目录 一、rsync简介 1.2 同步方式 1.2.1 全量备份 1.2.2 增量备份 1.2.3 差量备份 1.3 rsync的特点 1.4 rsync的优势与不足 1.5 rsync与cp、scp对比 1.6 rsync同类服务 二、rsync源服务器的关系 三、配置rsync源 3.1 基本思路 3.2 配置文件rsyncd.conf 3.3 独立…

代码随想录算法训练营19期第2天| 滑动窗口,螺旋矩阵模拟 (二刷)

【滑动窗口&#xff1a;209 长度最小子数组 904.水果成篮 76.最小覆盖子串】 209长度最小子数组 sum要达到target&#xff0c;自己滑动窗口法ac&#xff0c;不断往前囊括新的一个&#xff0c;然后试着从头减少一个个&#xff0c;看sum还够不够 mycode&#xff1a; int minS…

HDFS Erasure coding-纠删码介绍和原理

HDFS Erasure coding-纠删码介绍和原理 三副本策略弊端Erasure Coding&#xff08;EC&#xff09;简介Reed- Solomon&#xff08;RS&#xff09;码 EC架构 三副本策略弊端 为了提供容错能力&#xff0c;hdfs回根据replication factor&#xff08;复制因子&#xff09;在不同的…

ZooKeeper 选举的过半机制防止脑裂

结论&#xff1a; Zookeeper采用过半选举机制&#xff0c;防止了脑裂。 原因&#xff1a; 如果有5台节点&#xff0c;leader联系不上了&#xff0c;其他4个节点由于超过半数&#xff0c;所以又选出了一个leader&#xff0c;当失联的leader恢复网络时&#xff0c;发现集群中已…

idea application.yml配置文件没有提示或读不到配置

1.首先确定你的resources文件夹正常且yml文件图表和下面一样 不一样的右键去设置 2.确保你已经缩进了且层级关系正常 3.如果以上都不是&#xff0c;先考虑删除.idea重开试试 4.以上解决不了就装以下两个插件解决

目标检测之3维合成

现在有一系列的图片&#xff0c;图片之间可以按照z轴方向进行排列。图片经过了目标检测&#xff0c;输出了一系列的检测框&#xff0c;现在的需求是将检测框按类别进行合成&#xff0c;以在3维上生成检测结果。 思路&#xff1a;将图片按照z轴方向排列&#xff0c;以z轴索引作…

微分流形2:流形上的矢量场和张量场

来了来了&#xff0c;切向量&#xff0c;切空间。流形上的所有的线性泛函的集合&#xff0c;注意是函数的集合。然后取流形上的某点p&#xff0c;它的切向量为&#xff0c;线性泛函到实数的映射。没错&#xff0c;是函数到实数的映射&#xff0c;是不是想到了求导。我们要逐渐熟…

java源码-List源码解析

Java中的List是一个接口&#xff0c;它定义了一组操作列表的方法。List接口的常见子类包括ArrayList、LinkedList和Vector等。 以下是Java中List接口及其常见方法的源码解析&#xff1a; 1. List接口定义 public interface List<E> extends Collection<E> { …

Django模型将模型注释同步到数据库

1、安装django-comment-migrate库 pip install django-comment-migrate 2、将库注册到settings.py文件中 INSTALLED_APPS [...django_comment_migrate, # 表注释... ] 3、加注释 3.1、给模型&#xff08;表&#xff09;加注释 在模型的class Meta中编辑 verbose_name&…

Go和Java实现适配器模式

Go和Java实现适配器模式 我们通过下面的实例来演示适配器模式的使用&#xff0c;其中&#xff0c;音频播放器设备只能播放 mp3 文件&#xff0c;通过使用一个更高级 的音频播放器来播放 vlc 和 mp4 文件。 1、适配器模式 适配器模式是作为两个不兼容的接口之间的桥梁。这种…

UML/SysML建模工具更新(2023.7)(1-5)有国产工具

DDD领域驱动设计批评文集 欢迎加入“软件方法建模师”群 《软件方法》各章合集 最近一段时间更新的工具有&#xff1a; 工具最新版本&#xff1a;Visual Paradigm 17.1 更新时间&#xff1a;2023年7月11日 工具简介 很用心的建模工具。支持编写用例规约。支持文本分析和C…

kafka怎么用代码读取数据

Kafka可以通过Java语言中的Kafka客户端库来读取数据。以下是一个简单的Java代码示例&#xff0c;通过Kafka Consumer API从Kafka集群中读取数据&#xff1a; java import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.…

TCP三次握手和四次挥手以及11种状态(二)

11种状态 1、一开始&#xff0c;建立连接之前服务器和客户端的状态都为CLOSED&#xff1b; 2、服务器创建socket后开始监听&#xff0c;变为LISTEN状态&#xff1b; 3、客户端请求建立连接&#xff0c;向服务器发送SYN报文&#xff0c;客户端的状态变味SYN_SENT&#xff1b; 4、…

数据结构---树和二叉树

这里写目录标题 树和二叉树的定义树的定义树的基本术语线性结构和树形结构的比较二叉树的定义起因定义 案例引入前缀码编码表达式的实现二叉树的抽象类型定义 二叉树的性质和存储结构二叉树的性质二叉树的特殊形式满二叉树完全二叉树 完全二叉树的两个性质二叉树的存储结构顺序…

ubuntu目录分析

在Ubuntu根目录下&#xff0c;以下是一些常见文件夹的含义&#xff1a; /bin&#xff1a;存放可执行文件&#xff0c;包含一些基本的命令和工具。 /boot&#xff1a;存放启动时所需的文件&#xff0c;如内核和引导加载程序。 /dev&#xff1a;包含设备文件&#xff0c;用于与硬…

IntelliJ IDEA 2023.2 新版本,拥抱 AI

IntelliJ IDEA 近期连续发布多个EAP版本&#xff0c;官方在对用户体验不断优化的同时&#xff0c;也新增了一些不错的功能&#xff0c;尤其是人工智能助手补充&#xff0c;AI Assistant&#xff0c;相信在后续IDEA使用中&#xff0c;会对开发者工作效率带来不错的提升。 以下是…

Ai创作系统ChatGPT源码搭建教程+附源码

系统使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到本系统&#xff01; 更新内容&#xff1a; 同步官方图片重新生成指令 同步官方 Vary 指令 单张图片对比加强 Vary(Strong) | Vary(Subtle) 同步官方 Zoom 指令 单张图片无限缩放 Zoom out 2x | Zoom out 1.5x 新增GP…

移动IP的原理

目的 使得移动主机在各网络之间漫游时&#xff0c;仍然能保持其原来的IP地址不变 工作步骤 代理发现与注册 主机A&#xff1a;主机A移动到外地网络后&#xff0c;通过“代理发现协议”&#xff0c;与外地代理建立联系&#xff0c;并从外地代理获得一个转交地址&#xff0c;…