Kafka和flume整合

需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:

在flume/conf下添加.conf文件,

vi flume-kafka.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 配置 Source(监控目录)

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/root/flume-kafka/

a1.sources.r1.inputCharset=utf-8

# 配置 Sink(写入 Kafka)

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

#指定写入数据到哪一个topic

a1.sinks.k1.kafka.topic=testTopic

#指定写入数据到哪一个集群

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定写入批次

a1.sinks.k1.kafka.flumeBatchSize=20

#指定acks机制

a1.sinks.k1.kafka.producer.acks=1

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

# 最大存储 1000 个 Event

a1.channels.c1.transactionCapacity=100

# 每次事务处理 100 个 Event

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

在指定目录之下创建文件夹:

kafka中创建topic:

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3

启动flume:

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

启动kafka消费者,验证数据写入成功

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning

新增测试数据:

echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt

flume:

Kafka消费者

 

需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。

vi kafka-flume.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。

a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource

#指定zookeeper集群地址

a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181

#指定kafka集群地址

a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定生成消息的topic

a1.sources.r1.kafka.topics=testTopic

# 将 Flume 传输的数据内容直接打印到日志中,

a1.sinks.k1.type=logger

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transcationCapacity=100

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动Kafka生产者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic

启动Flume

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

在生产者中写入数据

Flume中采集到数据 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/79009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Idea 如何配合 grep console过滤并分析文件

这里写自定义目录标题 [grep console插件]()右击打开文件目录,选择 tail in console 同时可以添加自己的快捷键。 ![新的改变](https://i-blog.csdnimg.cn/direct/03423e27cf6c40c5abd2d53982547b61.png) 随后会在idea的菜单栏中出现tail菜单。这里,接下…

怎样学习Electron

学习 Electron 是一个很好的选择,特别是如果你想构建跨平台的桌面应用程序,并且已经有前端开发经验。以下是一个循序渐进的学习指南,帮助你从零开始掌握 Electron。 1. 基础知识 HTML/CSS/JavaScript 确保你对这些基础技术有扎实的理解&am…

MySQL 大数据量分页查询优化指南

问题分析 当对包含50万条记录的edu_test表进行分页查询时,发现随着分页越深入,查询时间越长: limit 0,10:0.05秒limit 200000,10:0.14秒limit 499000,10:0.21秒 通过EXPLAIN分析发现,limit o…

【仿真】Ubuntu 22.04 安装MuJoCo 3.3.2

官方GIthub下载: https://github.com/google-deepmind/mujoco/releases 官网:MuJoCo — Advanced Physics Simulation 文档:Overview - MuJoCo Documentation 主要参考:Ubuntu 22.04 安装Mujoco 3.22 - RobotStudent的文章 - 知乎 简…

最新字节跳动运维云原生面经分享

继续分享最新的go面经。 今天分享的是组织内部的朋友在字节的go运维工程师岗位的云原生方向的面经,涉及Prometheus、Kubernetes、CI/CD、网络代理、MySQL主从、Redis哨兵、系统调优及基础命令行工具等知识点,问题我都整理在下面了 面经详解 Prometheus …

PyQt6实例_pyqtgraph散点图显示工具_代码分享

目录 描述: 效果: 代码: 返回结果对象 字符型横坐标 通用散点图工具 工具主界面 使用举例 描述: 1 本例结合实际应用场景描述散点图的使用。在财报分析中,需要将数值放在同行业中进行比较,从而判…

纯C协程框架NtyCo

原文是由写的,写的真的很好,原文链接:纯c协程框架NtyCo实现与原理-CSDN博客 1.为什么会有协程,协程解决了什么问题? 网络IO优化 在CS,BS的开发模式下,服务器的吞吐量是一个受关注的参数&#x…

信息系统项目管理师——第10章 项目进度管理 笔记

10项目进度管理 1.规划进度管理:项目章程、项目管理计划(开发方法、范围管理计划)、事业环境因素、组织过程资产——专家判断、数据分析(备选方案分析)、会议——进度管理计划 2.定义活动:WBS进一步分解&am…

通过门店销售明细表用SQL得到每月每个门店的销冠和按月的同比环比数据

假设我在Snowflake里有销售表,包含ID主键、门店ID、日期、销售员姓名和销售额,需要统计出每个月所有门店和各门店销售额最高的人,不一定是一个人,以及他所在的门店ID和月总销售额。 统计每个月份下,各门店内销售额最高…

移远通信LG69T赋能零跑B10:高精度定位护航,共赴汽车智联未来

当前,汽车行业正以前所未有的速度迈向智能化时代,组合辅助驾驶技术已然成为车厂突出重围的关键所在。高精度定位技术作为实现车辆精准感知与高效协同的基石,其重要性日益凸显。 作为全球领先的物联网及车联网整体解决方案供应商,移…

jmeter-Beashell获取http请求体json

在JMeter中,使用BeanShell处理器或BeanShell Sampler来获取HTTP请求体中的JSON数据是很常见的需求。这通常用于在测试计划中处理和修改请求体,或者在响应后进行验证。以下是一些步骤和示例代码,帮助你使用BeanShell来获取HTTP请求体中的JSON数…

若干查找算法

一、顺序查找 1.原理 2.代码 #if 0 const int FindBySeq(const vector<int>& ListSeq, const int KeyData) {int retrIdx -1;int size ListSeq.size();for(int i 0; i < size; i) {if (ListSeq.at(i) KeyData){retrIdx i;break;}}return retrIdx; } #else c…

Uniapp(vue):生命周期

目录 一、Vue生命周期二、Uniapp中页面的生命周期三、执行顺序比较一、Vue生命周期 setup():是在beforeCreate和created之前运行的,所以可以用setup代替这两个钩子函数。onBeforeMount():已经完成了模板的编译,但是组件还未挂载到DOM上的函数。onMounted():组件挂载到DOM完…

Prometheus监控

1、docker - prometheusgrafana监控与集成到spring boot 服务_grafana spring boot-CSDN博客 2、【IT运维】普罗米修斯基本介绍及监控平台部署&#xff08;PrometheusGrafana&#xff09;-CSDN博客 3、Prometheus监控SpringBoot-CSDN博客 4、springboot集成普罗米修斯-CSDN博客…

C#进阶学习(十四)反射的概念以及关键类Type

目录 本文末尾有相关类中的总结&#xff0c;如有需要直接跳到最后即可 前置知识&#xff1a; 1、程序集&#xff08;Assembly&#xff09; 2、元数据&#xff08;Metadata&#xff09; 3、中间语言&#xff08;IL, Intermediate Language&#xff09; 中间语言&#xff08;…

Kotlin中的also、apply、invoke用法详解

以下是 Kotlin 中作用域函数(let、run、with、also、apply)和 invoke 操作符的完整总结,结合代码示例和对比说明,帮助您理解它们的用法和区别。 一、作用域函数:简化对象操作 作用域函数用于在对象的上下文中执行代码块,并根据函数的不同返回对象本身或 lambda 的结果。…

Ubuntu实现远程文件传输

目录 安装 FileZillaUbuntu 配套设置实现文件传输 在Ubuntu系统中&#xff0c;实现远程文件传输的方法有多种&#xff0c;常见的包括使用SSH&#xff08;Secure Shell&#xff09;的SCP&#xff08;Secure Copy Protocol&#xff09;命令、SFTP&#xff08;SSH File Transfer P…

TEC制冷片详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 三、程序设计 main文件 jdq.h文件 jdq.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 半导体制冷片&#xff08;又称热电模块&#xff09;&#xff0c;是利用半导体材料的珀耳帖效应制造的一种新型制冷元件…

DotNet 入门:(一) 环境安装

一、前言 本想用 Go 语言实现一个通过小爱同学操作电脑的&#xff0c;比如我对着手机说打开音乐&#xff0c;或调小音乐&#xff0c;电脑能做相应的处理。奈何我一时间没看懂&#xff0c;就想着用.Net 来试一下&#xff0c;于是就有了下面这篇文章。 二、安装.Net 环境 1. 下…

人工智能数学基础(四):线性代数

线性代数是人工智能领域的核心数学工具之一&#xff0c;广泛应用于数据表示、模型训练和算法优化等多个环节。本文将系统梳理线性代数的关键知识点&#xff0c;并结合 Python 实例&#xff0c;助力读者轻松掌握这一重要学科。资源绑定附上完整资源供读者参考学习&#xff01; …