flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...

1、source为http模式,sink为logger模式,将数据在控制台打印出来。

conf配置文件如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = http #该设置表示接收通过http方式发送过来的数据

a1.sources.r1.bind = hadoop-master #运行flume的主机或IP地址都可以

a1.sources.r1.port = 9000#端口

#a1.sources.r1.fileHeader = true

# Describe the sink

a1.sinks.k1.type = logger#该设置表示将数据在控制台打印出来

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume命令为:

bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。

显示如下的信息表示启动flume成功。

895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started

打开另外一个终端,通过http post的方式发送数据:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。

hadoop-master就是flume配置文件绑定的主机名,9000就是绑定的端口。

然后在运行flume的窗口就是看到如下的内容:

2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }

2、source为netcat(udp、tcp模式),sink为logger模式,将数据打印在控制台

conf配置文件如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master#绑定的主机名或IP地址

a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transcationCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume

bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。

然后在另外一个终端,使用telnet发送数据:

命令为:telnet hadoop-maser 44444

[root@hadoop-master ~]# telnet hadoop-master 44444

Trying 192.168.194.6...

Connected to hadoop-master.

Escape character is '^]'.

显示上面的信息表示连接flume成功,然后输入:

12213213213

OK

12321313

OK

在flume就会收到相应的信息:

2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }

2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }

3、source为netcat/http模式,sink为hdfs模式,将数据存储在hdfs中。

conf配置文件如下,文件名为hdfs.conf:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =regex_filter

a1.sources.r1.interceptors.i1.regex =^[0-9]*$

a1.sources.r1.interceptors.i1.excludeEvents =true

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系统中存放的位置

a1.sinks.k1.hdfs.filePrefix = events- #文件的前缀

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,这个设置是以text的格式存放从flume传输过来的数据。

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hdfs文件系统中创建文件存放的路径:

hadoop fs -mkdir /flume/event1。

启动flume:

bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console

通过telnet模式向flume中发送文件:

telnet hadoop-master 44444

然后输入:

aaaaaaaa

bbbbbbb

ccccccccc

dddddddddd

通过如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:

-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070

-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556

-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557

-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215

-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216

-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217

通过hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的内容:

aaaaaaaaaaaaaaaaa

bbbbbbbbbbbbbbbb

ccccccccccccccccccc

dddddddddddddddd

eeeeeeeeeeeeeeeeeee

fffffffffffffffffffffff

gggggggggggggggggg

hhhhhhhhhhhhhhhhhhhhhhh

iiiiiiiiiiiiiiiiiii

jjjjjjjjjjjjjjjjjjj

http模式就是把hdfs.conf文件中的netcat改为http,然后传输文件从telnet改为:

curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。

在hadoop文件中就会看到上面命令传输的内容:badou flume。

4、source为netcat/http模式,sink为hive模式,将数据存储在hive中,并分区存储。

conf配置如下,文件名为hive.conf:

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = hadoop-master

a1.sources.r1.port = 44444

# Describe the sink

#a1.sinks.k1.type = logger

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083

a1.sinks.k1.hive.database=default#hive数据库名

a1.sinks.k1.hive.table=flume_user1

a1.sinks.k1.serializer=DELIMITED

a1.sinks.k1.hive.partition=3#如果以netcat模式,只能静态设置分区的值,因为netcat模式传输数据,无法传输某个字段的值,只能按照顺序来。这里设置age的分区值为3。

#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能动态设置分区的值,因为http模式可以动态传输age的值。

a1.sinks.k1.serializer.delimiter=" "

a1.sinks.k1.serializer.serderSeparator=' '

a1.sinks.k1.serializer.fieldnames=user_id,user_name

a1.sinks.k1.hive.txnsPerBatchAsk = 10

a1.sinks.k1.hive.batchSize = 1500

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

在hive中创建表:

create table flume_user(

user_id int

,user_name string

)

partitioned by(age int)

clustered by (user_id) into 2 buckets

stored as orc

在hive-site.xml中添加如下内容:

javax.jdo.option.ConnectionPassword

hive

password to use against metastore database

hive.support.concurrency

true

hive.exec.dynamic.partition.mode

nonstrict

hive.txn.manager

org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

hive.compactor.initiator.on

true

hive.compactor.worker.threads

1

将hive根目录下的/hcatalog/share/hcatalog文件夹中的如下三个文件夹添加到flume的lib目录下。

运行flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

重新打开一个窗口,

启动metastroe服务:

hive --service metastore &

重新打开一个客户端,通过telnet连接到flume

telnet hadoop-master 44444

然后输入:

1 1

3 3

就会在hive中看到如下两行数据:

flume_user1.user_id flume_user1.user_name flume_user1.age

1 1 3

3 3 3

age是在hive.conf中设置的值3。

现在将flume的source换成http模式,然后hive分区通过参数模式动态的传输分区值。

将hive.conf中的

a1.sources.r1.type = netcat改成a1.sources.r1.type = http

a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。

然后启动flume:

bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。

在重新打开的窗口中通过http的模式传输数据到flume

curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。

在hive中可以看到如下的数据:

flume_user1.user_id flume_user1.user_name flume_user1.age

11 ligongong 109

由此可以看出通过http模式传输数据到hive中时,分区字段的信息是在header中传输,而其他字段的信息是放在bady中传输,并且不同列之间以hive.conf文件定义好的分隔符分隔。

5、使用avro模式,将数据在控制台打印出来。

不同的agent之间传输数据只能通过avro模式。

这里我们需要两台服务器来演示avro的使用,两台服务器分别是hadoop-master和hadoop-slave2

hadoop-master中运行agent2,然后指定agent2的sink为avro,并且将数据发送的主机名设置为hadoop-slave2。hadoop-master中flume的conf文件设置如下,名字为push.conf:

#Name the components on this agent

a2.sources= r1

a2.sinks= k1

a2.channels= c1

#Describe/configure the source

a2.sources.r1.type= netcat

a2.sources.r1.bind= hadoop-master

a2.sources.r1.port = 44444

a2.sources.r1.channels= c1

#Use a channel which buffers events in memory

a2.channels.c1.type= memory

a2.channels.c1.keep-alive= 10

a2.channels.c1.capacity= 100000

a2.channels.c1.transactionCapacity= 100000

#Describe/configure the source

a2.sinks.k1.type= avro#制定sink为avro

a2.sinks.k1.channel= c1

a2.sinks.k1.hostname= hadoop-slave2#指定sink要发送数据到的目的服务器名

a2.sinks.k1.port= 44444#目的服务器的端口

hadoop-slave2中运行的是agent1,agent1的source为avro。flume配置内容如下,文件名为pull.conf

#Name the components on this agent

a1.sources= r1

a1.sinks= k1

a1.channels= c1

#Describe/configure the source

a1.sources.r1.type= avro

a1.sources.r1.channels= c1

a1.sources.r1.bind= hadoop-slave2

a1.sources.r1.port= 44444

#Describe the sink

a1.sinks.k1.type= logger

a1.sinks.k1.channel = c1

#Use a channel which buffers events in memory

a1.channels.c1.type= memory

a1.channels.c1.keep-alive= 10

a1.channels.c1.capacity= 100000

a1.channels.c1.transactionCapacity= 100000。

现在hadoop-slave2中启动flume,然后在hadoop-master中启动flume,顺序一定要对,否则会报如下的错误:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address

在hadoop-slave2中启动flume:

bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console

在hadoop-master中启动flume:

bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console

重新打开一个窗口,通过telnet连接到hadoop-master

telnet hadoop-master 44444

然后发送11111aaaa

在hadoop-slave2的控制台中就会显示之前发送的,11111aaaa,如下所示:

2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }

6、通过flume将数据通传输到kafka,然后通过kafka将数据存储在hdfs和hive中。

在分别在hadoop-master、hadoop-slave1、hadoop-slave2上启动zookeeper。

命令为:

然后启动kafka,进入kafka的安装目录,执行命令:

./bin/kafka-server-start.sh config/server.properties &

在kafka中创建topic:

bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka

查看kafka中的topic:

bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181

启动kafka的消费者:

./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka

配置flume中conf文件,设置source类型为exec,sink为org.apache.flume.sink.kafka.KafkaSink,设置kafka的topic为上面创建的flume_kafka,具体配置如下:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#设置sources的类型为exec,就是执行命令的意思

a1.sources.r1.type = exec

#设置sources要执行的命令

a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt

# 设置kafka接收器

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# 设置kafka的broker地址和端口号

a1.sinks.k1.brokerList=hadoop-master:9092

# 设置Kafka的topic

a1.sinks.k1.topic=flume_kafka

# 设置序列化的方式

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动flume:

只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有数据时flume就会加载kafka中,然后被上面启动的kafka消费者消费掉。

我们查看发现/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的数据:

131,dry pasta

131,dry pasta

132,beauty

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

133,muscles joints pain relief

134,specialty wines champagnes

134,specialty wines champagnes

134,specialty wines champagnes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/532030.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python三角函数拟合_使用python进行数据拟合最小化函数

这是我对这个问题的理解。首先,我通过以下代码生成一些数据import numpy as npfrom scipy.integrate import quadfrom random import randomdef boxmuller(x0,sigma):u1random()u2random()llnp.sqrt(-2*np.log(u1))z0ll*np.cos(2*np.pi*u2)z1ll*np.cos(2*np.pi*u2)r…

java url 本地文件是否存在_我的应用程序知道URL中是否存在文件会一直停止[重复]...

这个问题在这里已有答案:我试图写一个应用程序,如果在给定的URL中有一个文件,将字符串放在textview中,这是代码和崩溃信息,可能是什么错误?public class MainActivity extends AppCompatActivity {String u…

python枚举类的意义_用于ORM目的的python枚举类

编辑问题我正在尝试创建一个类工厂,它可以生成具有以下属性的枚举类:>从列表中初始化类允许值(即,它)自动生成!).> Class创建自己的一个实例对于每个允许的值.>类不允许创建任何其他实例一旦上述步骤已完成(任何尝试这样做会导致异常).>类实…

java 生成校验验证码_java生成验证码并进行验证

一实现思路使用BufferedImage用于在内存中存储生成的验证码图片使用Graphics来进行验证码图片的绘制,并将绘制在图片上的验证码存放到session中用于后续验证最后通过ImageIO将生成的图片进行输出通过页面提交的验证码和存放在session中的验证码对比来进行校验二、生…

yy自动语音接待机器人_智能语音机器人落地产品有哪些?

据相关研究报告表明,在众多人工智能落地产品或者应用场景中,智能语音机器人无论从产品的成熟度还是应用的广泛度来说,都是人工智能行业最热门和最有前景的产品。智能语音机器人并不只是一款产品,它是所有智能语音系列产品的统称&a…

java资源文件获取属性_Java读写资源文件类Properties

Java中读写资源文件最重要的类是Properties1) 资源文件要求如下:1、properties文件是一个文本文件2、properties文件的语法有两种,一种是注释,一种属性配置。注 释:前面加上#号属性配置:以“键值”的方式书写一个属性的配置信息…

java被放弃了_为什么学Java那么容易放弃?

学习Java确实很容易就放弃,但是也很容易就学好,因为大多数人都是抱着试一试的心态,然后当后面就坚持不下去但是回过头来想一想,打游戏上分容易吗,一样是磕磕碰碰的,有时候十几连跪都不会放弃你上分的心情。…

python 隐马尔科夫_机器学习算法之——隐马尔可夫(Hidden Markov ModelsHMM)原理及Python实现...

前言上星期写了Kaggle竞赛的详细介绍及入门指导,但对于真正想要玩这个竞赛的伙伴,机器学习中的相关算法是必不可少的,即使是你不想获得名次和奖牌。那么,从本周开始,我将介绍在Kaggle比赛中的最基本的也是运用最广的机…

java编程50_java经典50编程题(1-10)

1.有一对兔子从出生后第三个月起,每个月都生一对小兔子,小兔子长到三个月后每个月又生一对兔子,假设兔子不死亡,问每个月兔子的总数为多少?分析过程图片发自简书App示例代码图片发自简书App运行结果图片发自简书App反思…

python替代hadoop_Python连接Hadoop数据中遇到的各种坑(汇总)

最近准备使用PythonHadoopPandas进行一些深度的分析与机器学习相关工作。(当然随着学习过程的进展,现在准备使用PythonSparkHadoop这样一套体系来搭建后续的工作环境),当然这是后话。但是这项工作首要条件就是将Python与Hadoop进行打通,本来认…

java 自动化测试_java写一个自动化测试

你模仿购物车试一下,同样是买东西,加上胜负平的赔率,输出改下应该就可以了package com.homework.lhh;import java.util.ArrayList;import java.util.Comparator;import java.util.Scanner;public class Ex04 {public static void main(String…

超大规模集成电路_纳米级超大规模集成电路芯片低功耗物理设计分析(二)

文 | 大顺简要介绍了功耗的组成,在此基础上从工艺、电路、门、系统四个层面探讨了纳米级超大规模集成电路的低功耗物理设计方法。关键词:纳米级;超大规模集成电路;电路芯片;电路设计02纳米级超大规模集成电路芯片低功耗…

java中的printnb_javaI/O系统笔记

1、File类File类的名字有一定的误导性;我们可能认为它指代的是文件,实际上却并非如此。它既能代表一个特定文件的名称,又能代表一个目录下的一组文件的名称。1.1、目录列表器如果需要查看目录列表,可以通过file.list(FilenameFilt…

outlook反应慢的原因_保险管怎么区分慢熔和快熔?

保险丝快熔与慢熔的区别所有双帽;对于这样的产品特性和安全性熔丝; gG的”,即,与接触帽组合接触;即,所述双(内/外盖)的盖。和一般的小型或地下加工厂,以便执行切割角,降低生产成本,这将选择单个帽铆接“单&…

java成员内部类_Java中的内部类(二)成员内部类

Java中的成员内部类(实例内部类):相当于类中的一个成员变量,下面通过一个例子来观察成员内部类的特点public classOuter {//定义一个实例变量和一个静态变量private inta;private static intb;//定义一个静态方法和一个非静态方法public static voidsay(…

word 通配符_学会Word通配符,可以帮助我们批量处理好多事情

长文档需要批量修改或删除某些内容的时候,我们可以利用Word中的通配符来搞定这一切,当然,前提是你必须会使用它。通配符的功能非常强大,能够随意组合替换或删除我们定义的规则内容,下面易老师就分享一些关于查找替换通…

java存储键值结构_java-键值存储为主数据库

我将要开始一个项目,该项目的读写操作非常频繁且频繁.因此,环顾四周,我发现内存数据库正是为此目的而创建的.经过更多调查后,我进入了redis.Redis看起来很酷(虽然刚开始阅读,但是对此有很多了解).但是我主要只看过关系数据库,并且以元组和关系的方式来考虑数据(我认为我可以随着…

python 输入文件名查找_python 查找文件名包含指定字符串的方法

编写一个程序,能在当前目录以及当前目录的所有子目录下查找文件名包含指定字符串的文件,并打印出绝对路径。import osclass searchfile(object):def __init__(self,path.):self._pathpathself.abspathos.path.abspath(self._path) # 默认当前目录def fin…

java 运行 出现选择_Eclipse 运行出现java.lang.NoClassDefFoundError的解决方法

上篇博文也提到了这个问题,但没有深入的讲解。这次特意做了整理,详细解释其原因。先看错误java.lang.NoClassDefFoundError,显然是java虚拟机找不到指定的类,多数情况下是外部jar中的类。Eclipse的自动化,集成化&#…

设置熄屏_刚买的手机微信收不到信息提醒耽误事情,手机到手一定要这样设置...

手机使用过程中经常会遇到第三方软件接收不到信息提醒的状况,常常因此耽误了很多重要的事情,造成损失。特别是刚换新手机或者手机刚升级系统时发生的最多。一般都觉得是手机问题,其实只是手机的系统设置出现了问题,只要跟我按照以…