go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

前言

在我们实际生产中,我们常常因为新的项目或者新的功能进而要对配置文件进行修改,但是在生产环境下我们不是每次配置文件发生变化都重启一次系统,这无疑是不切实际的,所以我们需要对配置文件进行实时监控,而今天我们所要展示的也就是如何基于etcd来监控配置文件的变化。

etcd对配置项监控的流程

需求分析

首先我们来看我们日志收集服务的主要工作流程:

func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

在上述主要工作逻辑的基础上,现在我们需要etcd来实现对配置文件的实时监控,而这就需要我们在后态去运行一个监控程序来实时监控查看需要见监控的配置文件是否变化。并且将变化发送到tailFile模块中

实现Watch监控

所以这里我们对main.go进行一点简单的修改,添加一个后台程序 go etcd.WatchConf(ConfigObj.Etcdaddress.Key):

package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}

我们在来看这个函数的具体逻辑:

func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}

与之前有关etcd的文章中的操作例子不同,这里我们并没有定义上下文,主要是因为这里我们不确定什么时候终止这个程序,所以不使用上下文了。

发送新配置到tailFile中

在上面我们已经完成etcd的监控,现在我们需要把新的配置消息发送到tailFile,这里我们第一反应是写一个死循环一直独缺,但是这样其实不大方便,毕竟储蓄一直运行会占掉大量不必要消耗的资源,这里我们可以让双方使用管道来进行通信,平时管道处于阻塞状态,只有监测到新配置才会进行通信,这样会使资源得到最大化的利用,我们来看一看具体的代码实现:

  • 首先我们来定义一下用于通信的管道
var (confchan chan []common.CollectEntry
)
  • 然后我们要对管道进行初始化,并且读取管道中新的配置信息:
confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)

最后,由于我们这里管道只用于etcd模块与tailFile模块之间的通信,所以这里我们就不暴露管道,而是选择暴露函数:

func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

结语

最后附上上述变化模块的代码:

  • main.go
package mainimport ("fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/etcd""log-agent/tailFile"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`Etcdaddress  EtcdAddress `ini:"etcd"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}type EtcdAddress struct {Addr []string `ini:"address"`Key  string   `ini:"collect_key"`
}func run() {select {}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化etcderr = etcd.Init(ConfigObj.Etcdaddress.Addr)if err != nil {logrus.Error("InitEtcd failed, err:%v", err)return}logrus.Infof("InitEtcd success")//拉取要收集日志文件的配置项err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)if err != nil {logrus.Error("GetConf failed, err:%v", err)return}fmt.Println(collectEntryList)go etcd.WatchConf(ConfigObj.Etcdaddress.Key)//初始化tailerr = tailFile.InitTail(collectEntryList)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")run()
}
  • etcd.go
package etcdimport ("encoding/json""fmt""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"clientv3 "go.etcd.io/etcd/client/v3""golang.org/x/net/context""log-agent/common""log-agent/tailFile""time"
)var client *clientv3.Clientfunc Init(address []string) (err error) {client, err = clientv3.New(clientv3.Config{Endpoints:   address,DialTimeout: 5 * time.Second,})if err != nil {logrus.Error("etcd client connect failed,err:%v", err)return}return
}func GetConf(key string) (err error, collectEntryList []common.CollectEntry) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)response, err := client.Get(ctx, key)cancel()if err != nil {logrus.Error("get conf from etcd failed,err:%v", err)return}if len(response.Kvs) == 0 {logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)return}fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryListif err != nil {logrus.Error("json unmarshal failed,err:%v", err)return}return
}func WatchConf(key string) {rch := client.Watch(context.Background(), key)var newConf []common.CollectEntryfor wresp := range rch {logrus.Infof("get new conf fromn etcd")for _, ev := range wresp.Events {fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)err := json.Unmarshal(ev.Kv.Value, &newConf)if err != nil {logrus.Error("json unmarshal failed,err:%v", err)continue}tailFile.SendNewConf(newConf)}}
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail""log-agent/Kafka""log-agent/common""strings""time"
)type tailTask struct {path    stringtopic   stringTailObj *tail.Tail
}var (confchan chan []common.CollectEntry
)func NewTailTask(path, topic string) (tt *tailTask) {tt = &tailTask{path:  path,topic: topic,}return tt
}func (task *tailTask) Init() (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}task.TailObj, err = tail.TailFile(task.path, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)return}return
}func InitTail(collectEntryList []common.CollectEntry) (err error) {for _, entry := range collectEntryList {tt := NewTailTask(entry.Path, entry.Topic)err = tt.Init()if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)continue}go tt.run()}//初始化新配置的管道confchan = make(chan []common.CollectEntry)newConf := <-confchanlogrus.Infof("get newconf from etcd", newConf)return
}func (t *tailTask) run() {for {line, ok := <-t.TailObj.Linesif !ok {logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)time.Sleep(2 * time.Second)continue}if len(strings.Trim(line.Text, "\r")) == 0 {continue}msg := &sarama.ProducerMessage{}msg.Topic = t.topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MesChan(msg)}
}func SendNewConf(newConf []common.CollectEntry) {confchan <- newConf
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/7197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【云原生】Pod 的生命周期(二)

【云原生】Pod 的生命周期&#xff08;一&#xff09;【云原生】Pod 的生命周期&#xff08;二&#xff09; Pod 的生命周期&#xff08;二&#xff09; 6.容器探针6.1 检查机制6.2 探测结果6.3 探测类型 7.Pod 的终止7.1 强制终止 Pod7.2 Pod 的垃圾收集 6.容器探针 probe 是…

uniapp文本框上下滚动问题

一个基本需求&#xff0c;textarea标签没有办法通过手拖动的方式进行滚动&#xff0c;当文字超出其容量后&#xff0c;想要编辑上面被遮挡部分的文字这边难以点到&#xff0c;电脑可以鼠标滚轮&#xff0c;但手机需要拖动但无效&#xff1a; 下面提供了我的解决思路&#xff1a…

秋招后端开发面试题 - JVM类加载机制

目录 JVM类加载机制前言面试题能说一下类的生命周期吗&#xff1f;类加载的过程知道吗&#xff1f;类加载器有哪些&#xff1f;什么是双亲委派机制&#xff1f;为什么要用双亲委派机制&#xff1f;如何破坏双亲委派机制&#xff1f;如何判断一个类是无用的类&#xff1f; JVM类…

高精地图是怎么构建的?方案有哪些?高精度语义地图构建的一点思考

高精地图是怎么构建的&#xff1f;方案有哪些&#xff1f;高精度语义地图构建的一点思考 高精度(High-Definition, HD)语义地图是目前自动驾驶领域的一个重要研究方向&#xff0c;近年随着Transformer和BEV的大火&#xff0c;很多大佬团队都开展了HD语义地图构建相关的工作。2…

【005_音频开发_基础篇_ALSA_Codec_驱动-MA120x0P功放】

005_音频开发_基础篇_ALSA_Codec_驱动-MA120x0P功放 文章目录 005_音频开发_基础篇_ALSA_Codec_驱动-MA120x0P功放创作背景MA120X0P输出模式BTLSEPBTLSEBTL 硬件配置方式/硬件Limiter限幅器限幅器作用过程 主要寄存器操作指令 ma120x0p.cma120x0p.h 创作背景 学历代表过去、能…

Vue MVVM这一篇就够啦!

Vue vs React 相似之处: 它们都有使用 Virtual DOM虚拟DOM-CSDN博客&#xff1b;提供了响应式&#xff08;Reactive&#xff09;和组件化&#xff08;Composable&#xff09;的视图组件。将注意力集中保持在核心库&#xff0c;而将其他功能如路由和全局状态管理交给相关的库。R…

GreptimeDB 助力国家电网数字换流站打造稳定高效的时序数据底座

电网体系作为现代社会运行的支柱之一&#xff0c;为各行各业、千家万户提供了电能的基本支持。从家庭到企业&#xff0c;医院到学校&#xff0c;交通到通讯&#xff0c;电力电网的应用贯穿始终。近年来&#xff0c;特高压换流站成为国家电网的重点建设工程&#xff0c;“十四五…

vivado Virtex 和 Kintex UltraScale+ 比特流设置

下表所示 Virtex 和 Kintex UltraScale 器件的器件配置设置可搭配 set_property <Setting> <Value> [current_design] Vivado 工具 Tcl 命令一起使用。

RAG 修炼手册|一文讲透 RAG 背后的技术

今天我们继续剖析 RAG&#xff0c;将为大家详细介绍 RAG 背后的例如 Embedding、Transformer、BERT、LLM 等技术的发展历程和基本原理&#xff0c;以及它们是如何应用的。 01. 什么是 Embedding? Embedding 是将离散的非结构化数据转换为连续的向量表示的技术。 在自然语言…

管理能力学习笔记九:授权的常见误区和如何有效授权

授权的常见误区 误区一&#xff1a;随意授权 管理者在授权工作时&#xff0c;需要依据下属的能力、经验、意愿问最自己&#xff1a;这项工作适合授权给Ta做吗&#xff1f;如果没有&#xff0c;可以通过哪些方法进行培训呢&#xff1f; 误区二&#xff1a;缺乏信任 心理暗示…

Web前端开发之CSS_3

CSS3 新特性动画媒体查询雪碧图字体图标 1. CSS3 新特性 1.1 圆角 使用CSS3 border-radius 属性&#xff0c;可以给任何元素制作“圆角”。border-radius属性&#xff0c;可以使用以下规则&#xff1a; 四个值&#xff1a;依次为左上角、右上角、右下角和左下角&#xff08;…

牛客NC275 和为S的两个数字【简单 map C++/Java/Go/PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/390da4f7a00f44bea7c2f3d19491311b 思路 map参考答案C #include <vector> class Solution {public:vector<int> FindNumbersWithSum(vector<int> array, int sum) {vector<int> ans;m…

Java并发代码入门

Java并发代码入门 1. 第一个线程2. Java创建线程的5种方式3. 多线程优势代码4. 线程的属性5. 中断线程1. 使用自定义的变量来作为标志位2. Thread.interrupted() 或者Thread.currentThread().isInterrupted() 代替自定义标志位 6. join2.5 等待一个线程-join()A中调用B.join表示…

【iOS】NSOperation、NSOperationQueue

文章目录 前言一、NSOperation、NSOperationQueue 简介二、NSOperation、NSOperationQueue 操作和操作队列三、NSOperation四、NSOperationQueue五、NSOperationQueue 控制串行执行、并发执行六、 NSOperation 操作依赖七、NSOperation 优先级八、NSOperation、NSOperationQueu…

一个JDBC小工具

pom.xml 结构 <properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><mysql5>5.1.44<…

服务器端优化-Redis内存划分和内存配置

6、服务器端优化-Redis内存划分和内存配置 当Redis内存不足时&#xff0c;可能导致Key频繁被删除、响应时间变长、QPS不稳定等问题。当内存使用率达到90%以上时就需要我们警惕&#xff0c;并快速定位到内存占用的原因。 有关碎片问题分析 Redis底层分配并不是这个key有多大&…

[信息收集]-端口扫描--Nmap

端口号 端口号的概念属于计算机网络的传输层&#xff0c;标识这些不同的应用程序和服务而存在的。通过使用不同的端口号&#xff0c;传输层可以将接收到的数据包准确地传递给目标应用程序。 80&#xff1a;HTTP&#xff08;超文本传输协议&#xff09;用于Web浏览器访问网页 …

淘宝商家联系方式获取工具解析 淘宝商家采集工具分享

淘宝商家联系方式获取工具是一种用于获取淘宝商家联系方式的软件工具。它可以帮助用户从淘宝网上采集商家的联系方式&#xff0c;如店铺名称、联系人姓名、电话号码、微信号等。 这种工具的原理通常是通过模拟用户在淘宝搜索商家和访问商家店铺的行为&#xff0c;然后从店铺页…

【信息系统项目管理师练习题】资源管理

马斯洛需求层次理论中,下列哪种需求属于最高层次的需求? A) 生理需求 B) 安全需求 C) 社会交往的需求 D) 自我实现的需求 答案:D) 自我实现的需求 根据赫兹伯格双因素理论,下列关于激励因素和保健因素的说法正确的是: A) 激励因素能够消除工作中的不满意,保健因素…

jvm面试题30问

什么是JVM的跨平台&#xff1f; 什么是JVM的语言无关性&#xff1f; 什么是JVM的解释执行 什么是JIT? JIT&#xff1a;在Java编程语言和环境中&#xff0c;即时编译器&#xff08;JIT compiler&#xff0c;just-in-time compiler&#xff09;是一个把Java的字节码&#xff08;…