go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

前言

在我们实际生产中,我们常常因为新的项目或者新的功能进而要对配置文件进行修改,但是在生产环境下我们不是每次配置文件发生变化都重启一次系统,这无疑是不切实际的,所以我们需要对配置文件进行实时监控,而今天我们所要展示的也就是如何基于etcd来监控配置文件的变化。

etcd对配置项监控的流程

需求分析

首先我们来看我们日志收集服务的主要工作流程:

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化etcd
	err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)
	//初始化tail

	err = tailFile.InitTail(collectEntryList)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")
	run()
}

在上述主要工作逻辑的基础上,现在我们需要etcd来实现对配置文件的实时监控,而这就需要我们在后态去运行一个监控程序来实时监控查看需要见监控的配置文件是否变化。并且将变化发送到tailFile模块中

实现Watch监控

所以这里我们对main.go进行一点简单的修改,添加一个后台程序 go etcd.WatchConf(ConfigObj.Etcdaddress.Key):

package main

import (
	"fmt"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/go-ini/ini"
	"log-agent/Kafka"
	"log-agent/etcd"
	"log-agent/tailFile"
)

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
	Etcdaddress  EtcdAddress `ini:"etcd"`
}

type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}

type LogFilePath struct {
	Path string `ini:"logfile_path"`
}

type EtcdAddress struct {
	Addr []string `ini:"address"`
	Key  string   `ini:"collect_key"`
}

func run() {
	select {}
}

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化etcd
	err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)

	go etcd.WatchConf(ConfigObj.Etcdaddress.Key)

	//初始化tail
	err = tailFile.InitTail(collectEntryList)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")
	run()
}

我们在来看这个函数的具体逻辑:

func WatchConf(key string) {
	rch := client.Watch(context.Background(), key)
	var newConf []common.CollectEntry
	for wresp := range rch {
		logrus.Infof("get new conf fromn etcd")
		for _, ev := range wresp.Events {
			fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			err := json.Unmarshal(ev.Kv.Value, &newConf)
			if err != nil {
				logrus.Error("json unmarshal failed,err:%v", err)
				continue
			}
			tailFile.SendNewConf(newConf)
		}
	}
}

与之前有关etcd的文章中的操作例子不同,这里我们并没有定义上下文,主要是因为这里我们不确定什么时候终止这个程序,所以不使用上下文了。

发送新配置到tailFile中

在上面我们已经完成etcd的监控,现在我们需要把新的配置消息发送到tailFile,这里我们第一反应是写一个死循环一直独缺,但是这样其实不大方便,毕竟储蓄一直运行会占掉大量不必要消耗的资源,这里我们可以让双方使用管道来进行通信,平时管道处于阻塞状态,只有监测到新配置才会进行通信,这样会使资源得到最大化的利用,我们来看一看具体的代码实现:

  • 首先我们来定义一下用于通信的管道
var (
	confchan chan []common.CollectEntry
)
  • 然后我们要对管道进行初始化,并且读取管道中新的配置信息:
confchan = make(chan []common.CollectEntry)
	newConf := <-confchan
	logrus.Infof("get newconf from etcd", newConf)

最后,由于我们这里管道只用于etcd模块与tailFile模块之间的通信,所以这里我们就不暴露管道,而是选择暴露函数:

func SendNewConf(newConf []common.CollectEntry) {
	confchan <- newConf
}

结语

最后附上上述变化模块的代码:

  • main.go
package main

import (
	"fmt"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/go-ini/ini"
	"log-agent/Kafka"
	"log-agent/etcd"
	"log-agent/tailFile"
)

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
	Etcdaddress  EtcdAddress `ini:"etcd"`
}

type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}

type LogFilePath struct {
	Path string `ini:"logfile_path"`
}

type EtcdAddress struct {
	Addr []string `ini:"address"`
	Key  string   `ini:"collect_key"`
}

func run() {
	select {}
}

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化etcd
	err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)

	go etcd.WatchConf(ConfigObj.Etcdaddress.Key)

	//初始化tail
	err = tailFile.InitTail(collectEntryList)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")
	run()
}

  • etcd.go
package etcd

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	clientv3 "go.etcd.io/etcd/client/v3"
	"golang.org/x/net/context"
	"log-agent/common"
	"log-agent/tailFile"
	"time"
)

var client *clientv3.Client

func Init(address []string) (err error) {
	client, err = clientv3.New(clientv3.Config{
		Endpoints:   address,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		logrus.Error("etcd client connect failed,err:%v", err)
		return
	}
	return
}

func GetConf(key string) (err error, collectEntryList []common.CollectEntry) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	response, err := client.Get(ctx, key)
	cancel()
	if err != nil {
		logrus.Error("get conf from etcd failed,err:%v", err)
		return
	}
	if len(response.Kvs) == 0 {
		logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
		return
	}
	fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
	err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
	if err != nil {
		logrus.Error("json unmarshal failed,err:%v", err)
		return
	}
	return
}

func WatchConf(key string) {
	rch := client.Watch(context.Background(), key)
	var newConf []common.CollectEntry
	for wresp := range rch {
		logrus.Infof("get new conf fromn etcd")
		for _, ev := range wresp.Events {
			fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
			err := json.Unmarshal(ev.Kv.Value, &newConf)
			if err != nil {
				logrus.Error("json unmarshal failed,err:%v", err)
				continue
			}
			tailFile.SendNewConf(newConf)
		}
	}
}

  • tailFile.go
package tailFile

import (
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/hpcloud/tail"
	"log-agent/Kafka"
	"log-agent/common"
	"strings"
	"time"
)

type tailTask struct {
	path    string
	topic   string
	TailObj *tail.Tail
}

var (
	confchan chan []common.CollectEntry
)

func NewTailTask(path, topic string) (tt *tailTask) {
	tt = &tailTask{
		path:  path,
		topic: topic,
	}
	return tt
}

func (task *tailTask) Init() (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	task.TailObj, err = tail.TailFile(task.path, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
		return
	}
	return
}

func InitTail(collectEntryList []common.CollectEntry) (err error) {
	for _, entry := range collectEntryList {
		tt := NewTailTask(entry.Path, entry.Topic)
		err = tt.Init()
		if err != nil {
			logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
			continue
		}
		go tt.run()
	}
	//初始化新配置的管道
	confchan = make(chan []common.CollectEntry)
	newConf := <-confchan
	logrus.Infof("get newconf from etcd", newConf)
	return
}

func (t *tailTask) run() {
	for {
		line, ok := <-t.TailObj.Lines
		if !ok {
			logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
			time.Sleep(2 * time.Second)
			continue
		}
		if len(strings.Trim(line.Text, "\r")) == 0 {
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = t.topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MesChan(msg)
	}
}

func SendNewConf(newConf []common.CollectEntry) {
	confchan <- newConf
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/596035.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Netty核心线程模型源码分析

文章目录 一、Netty线程模型简介二、Netty线程模型源码分析1. 服务端源码分析 一、Netty线程模型简介 Netty的线程模型图如下所示&#xff1a; 具体细节看这篇博客 二、Netty线程模型源码分析 1. 服务端源码分析 首先我们在写Netty服务端程序的时候最开始是下面两句代码&a…

React + 项目(从基础到实战) -- 第11期

目标 问卷编辑器的开发 设计UI - 拆分布局 水平垂直居中 画布 y方向滚动 自定义问卷组件 后端 返回组件数据 //获取单个问卷信息{url: /api/question/:id,method: get,response: () > {return {errno: 0,data: {id: Random.id(),title: Random.ctitle(),componentList:[//…

1W 3KVDC 隔离双输出 DC/DC 电源模块 ——TPD 系列

TPD系列提供双独立输出电压&#xff0c;并且两组电压可以不同&#xff0c;这样就节省一个电源模块&#xff0c;特别适合一块板上有多个不同电压要求的设计&#xff0c;而外形尺寸和TPA一样&#xff0c;工作温度范围广-40℃到 105℃。

【go项目01_学习记录05】

学习记录 1 依赖管理 Go Modules1.1 弃用 $GOPATH1.2 Go Modules 日常使用1.2.1 初始化生成go.mod文件1.2.2 Go Proxy代理1.2.3 go.mod文件查看1.2.4 go.sum文件查看1.2.5 indirect 含义1.2.6 go mod tidy 命令1.2.7 清空 Go Modules 缓存1.2.8 下载依赖1.2.9 所有 Go Modules …

sip转webrtc方案

技术选型 由于很多企业会议协议用的主要是webrtc&#xff0c;但是项目上很多时候的一些旧设备只支持sip协议&#xff0c;并不支持webrtc协议。所以sip和webrtc的相互转换就很有必要。 流媒体服务mediasoup本身并不支持sip协议。那么如何实现sip转webrtc呢&#xff1f; 根据调研…

攻防世界-xff-referer

题目信息 分析过程 显示ip必须为123.123.123.123&#xff0c;则进行伪造 解题过程 打开repeator 提示必须来自https://www.google.com&#xff0c;则再次构造Referer 相关知识 x-forwarded-for 和 referer的区别: x-forwarded-for 用来证明ip的像是“127.0.0.1”这种&a…

迭代器解释(C++)

一、什么是迭代器 为了提高C编程的效率&#xff0c;STL&#xff08;Standard Template Library&#xff09;中提供了许多容器&#xff0c;包括vector、list、map、set等。然而有些容器&#xff08;vector&#xff09;可以通过下标索引的方式访问容器里面的数据&#xff0c;但是…

【论文泛读】如何进行动力学重构? 神经网络自动编码器结合SINDy发现数据背后蕴含的方程

这一篇文章叫做 数据驱动的坐标发现与方程发现算法。 想回答的问题很简单&#xff0c;“如何根据数据写方程”。 想想牛顿的处境&#xff0c;如何根据各种不同物体下落的数据&#xff0c;写出万有引力的数学公式的。这篇文章就是来做这件事的。当然&#xff0c;这篇论文并没有…

流畅的python-学习笔记_对象引用、可变性、垃圾回收

变量不是盒子 即变量是引用&#xff0c;而不是实际内存&#xff0c;多个标识赋值相同变量时&#xff0c;多余标识是引用 标识、相等性、别名 比较对象的值&#xff0c;is比较对象的id。实际调用对象的__eq__方法。is速度比快&#xff0c;因为is不能重载&#xff0c;省去了寻…

TypeScript学习日志-第十九天(namespace命名空间)

namespace命名空间 一、基本用法 namespace 所有的变量以及方法必须要导出才能访问&#xff0c;如图&#xff1a; 二、 嵌套 namespace 可以进行嵌套使用&#xff0c;如图&#xff1a; 它也必须需要导出才能访问 三、合并 当我们出现两个同名的 namespace 它就会合并这两…

4+1视图,注意区分类图与对象图

注意区分类图和对象图。对象图标记的是对象名&#xff0c;命名形式 对象名:类名&#xff0c;或者:类名。这里没有出现冒号&#xff0c;表示的是类图。 对象图(object diagram)。 对象图描述一组对象及它们之间的关系。对象图描述了在类图中所建立的事物实例的静态快照。和类图一…

创造未来知识管理新篇章:Ollama与AnythingLLM联手打造个人与企业的安全知识库!

一 Ollama 1.1 简介 Ollama是一个开源的大型语言模型服务工具,它帮助用户快速在本地运行大模型。通过简单的安装指令,用户可以执行一条命令就在本地运行开源大型语言模型,如Llama 2。Ollama极大地简化了在Docker容器内部署和管理LLM的过程,使得用户能够快速地在本地运行大…

软件测试,软件评测师

如果你想考软件评测师证书&#xff0c;那这篇文章可以帮你少走很多弯路&#xff0c;估计你用别人一半的时间备考就可以通过考试&#xff0c;以下为本人亲身经验哈&#xff0c;你可以先收藏后看哦&#xff0c;提前祝你考试过过过。 如果以后想从事一份软件测试工程师的工作&…

浅析扩散模型与图像生成【应用篇】(二十一)——DALLE·2

21. Hierarchical Text-Conditional Image Generation with CLIP Latents 该文提出一种基于层级式扩散模型的由文本生成图像的方法&#xff0c;也就是大名鼎鼎的DALLE2。在DALLE2之前呢&#xff0c;OpenAI团队已经推出了DALLE和GLIDE两个文生图模型了&#xff0c;其中DALLE是基…

fabric部署调用合约示例

一 打包智能合约 ①进入fabric-samples文件夹下的chaincode/fabcar/go目录下执行 GO111MODULEon go mod vendor下载依赖&#xff08;文件夹下已经有go.mod&#xff0c;不需要使用go mod init生成该module文件&#xff09;②进入到test-network文件下使用以下命令将二进制文件…

2002-2021年各地区平均受教育年限数据(分性别)(含原始数据+计算过程+计算结果)

2002-2021年各地区平均受教育年限数据&#xff08;分性别&#xff09;&#xff08;含原始数据计算过程计算结果&#xff09; 1、时间&#xff1a;2002-2021年 2、来源&#xff1a;国家统计局、统计年鉴、各省年鉴 3、指标&#xff1a;行政区划代码、地区、年份、人均受教育年…

Footprint Analytics 与 Core Chain 达成战略合作

​ 领先的区块链数据解决方案提供商 Footprint Analytics 与比特币驱动、EVM 兼容的 Layer 1 区块链 Core Chain 宣布达成战略合作。此次合作旨在将 Footprint Analytics 的前沿数据解决方案与 Core Chain 的区块链基础设施相结合&#xff0c;共同引领区块链领域的创新发展。 …

苹果挖走大量谷歌人才,建立神秘人工智能实验室;李飞飞创业成立「空间智能」公司丨 RTE 开发者日报 Vol.197

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

Apache.commons.lang3 的 isNumber 将会在 lang 4 的时候丢弃

在判断输入的字符串是不是一个数字的时候&#xff0c;我们通常用的最多的方法就是 &#xff1a; NumberUtils.isNumber("12"); 但是这个方法将会在 Lang 4.0 版本中被丢弃。 可以使用的替代方法为&#xff1a;isCreatable(String) 通过查看源代码&#xff0c;我们…

【数据结构】有关环形链表题目的总结

文章目录 引入 - 快慢指针思考 - 快慢指针行走步数进阶 - 寻找环形链表的头 引入 - 快慢指针 141-环形链表 - Leetcode 关于这道题&#xff0c;大家可以利用快慢指针&#xff0c;一个每次走两步&#xff0c;一个每次走一步&#xff0c;只要他们有一次相撞了就代表说这是一个链…
最新文章