From eba6d6569df85814d0a24047c524869b8ea6a31f Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 9 Sep 2024 08:54:56 +0800 Subject: [PATCH] etcd watcher --- conf/config.go | 4 ++- conf/config.ini | 5 ++-- etcd/etcd.go | 21 ++++++++++++--- kafka/kafka.go | 57 ++++++++++++++++++++++++++++++--------- main.go | 11 +++++--- put_etcd/etcd_putvalue.go | 4 +-- taillog/taillog.go | 47 +++++++++++++++++++++++++------- taillog/tailog_mgr.go | 48 +++++++++++++++++++++++++++++++++ 8 files changed, 163 insertions(+), 34 deletions(-) create mode 100644 taillog/tailog_mgr.go diff --git a/conf/config.go b/conf/config.go index 656667a..2d6b499 100644 --- a/conf/config.go +++ b/conf/config.go @@ -6,11 +6,13 @@ type AppConfig struct { } type KafkaConf struct { - Address string `ini:"address"` + Address string `ini:"address"` + ChanMaxSize int `ini:"chan_max_size"` } type EtcdConf struct { Address string `ini:"address"` + Key string `ini:"collect_log_key"` Timeout int `ini:"timeout"` } diff --git a/conf/config.ini b/conf/config.ini index 5adf343..35a9ec0 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -1,7 +1,8 @@ [kafka] address=43.143.245.135:9093 - +chan_max_size=100000 [etcd] address=43.143.245.135:2379 -timeout=5 \ No newline at end of file +timeout=5 +collect_log_key=/logagent/collect_log_config \ No newline at end of file diff --git a/etcd/etcd.go b/etcd/etcd.go index 3a0caa6..df52562 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -12,9 +12,11 @@ var ( cli *clientv3.Client ) +//需要手机的日志的配置信息 + type LogEntry struct { - Path string `json:"path"` - Topic string `json:"topic"` + Path string `json:"path"` //日志存放的路径 + Topic string `json:"topic"` //日志要发往kafka中的哪个Topic } //初始化etcd @@ -22,7 +24,7 @@ type LogEntry struct { func Init(addr string, timeout time.Duration) (err error) { cli, err = clientv3.New(clientv3.Config{ Endpoints: []string{addr}, - DialTimeout: 5 * time.Second, + DialTimeout: timeout, }) if err != nil { //handle error! @@ -52,3 +54,16 @@ func GetConf(key string) (logEntryConf []*LogEntry, err error) { } return } + +//etcd的哨兵 + +func Watcher(key string) { + ch := cli.Watch(context.Background(), key) + for w := range ch { + for _, evt := range w.Events { + fmt.Printf("index:%v value:%v\n", evt.Type, string(evt.Kv.Value)) + //通知taillog.tailTskMgr + + } + } +} diff --git a/kafka/kafka.go b/kafka/kafka.go index 70c2b12..06a148b 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -3,18 +3,25 @@ package kafka import ( "fmt" "github.com/Shopify/sarama" + "time" ) //专门往kafka写日志的模块 +type logData struct { + topic string + data string +} + // 定义全局的连接 var ( - client sarama.SyncProducer //声明一个连接全局的kafka生产者client - + client sarama.SyncProducer //声明一个连接全局的kafka生产者client + logDataChan chan *logData ) // 初始化client -func Init(address []string) (err error) { + +func Init(address []string, maxSize int) (err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition @@ -25,19 +32,43 @@ func Init(address []string) (err error) { return } fmt.Printf("连接kafka成功!\n") + logDataChan = make(chan *logData, maxSize) + //开启后台的goroutine从通道中取数据发送kafka + go sendToKafka() return } -func SendToKafka(topic, data string) { - //构造一个消息 - msg := &sarama.ProducerMessage{} - msg.Topic = topic - msg.Value = sarama.StringEncoder(data) +//真正往kafka发送日志的函数 + +func sendToKafka() { + for { + select { + case ld := <-logDataChan: + //构造消息 + msg := &sarama.ProducerMessage{} + msg.Topic = ld.topic + msg.Value = sarama.StringEncoder(ld.data) + + //发送到kafka + pid, offset, err := client.SendMessage(msg) + if err != nil { + fmt.Println("send message err:", err) + } + fmt.Printf("pid:%v offset:%v\n", pid, offset) + default: + time.Sleep(time.Millisecond * 50) + } - //发送到kafka - pid, offset, err := client.SendMessage(msg) - if err != nil { - fmt.Println("send message err:", err) } - fmt.Printf("pid:%v offset:%v\n", pid, offset) + +} + +//暴露外部的函数,该函数只把日志数据发送到一个channel中 + +func SendToChan(topic, data string) { + msg := &logData{ + topic: topic, + data: data, + } + logDataChan <- msg } diff --git a/main.go b/main.go index 41ff732..194dbe9 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( config "logagent/conf" "logagent/etcd" "logagent/kafka" + "logagent/taillog" "time" ) @@ -35,7 +36,7 @@ func main() { fmt.Println("load ini err:", err) } //1.初始化kafka连接 - err = kafka.Init([]string{cfg.KafkaConf.Address}) + err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize) if err != nil { fmt.Printf("init kafka failed,error:%v\n", err) return @@ -48,17 +49,19 @@ func main() { } //1.从etcd中获取日志收集项的配置信息 - logEntryConf, err := etcd.GetConf("/xxx/") + logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key) if err != nil { fmt.Printf("get etcd failed,error:%v\n", err) return } fmt.Printf("get conf from etcd success,logEntryConf:%#v\n", logEntryConf) + //派一个哨兵去监视etcd配置的变化 for k, v := range logEntryConf { fmt.Printf("index:%v,value:%v\n", k, v) } - //2.派一个哨兵去监视日志收集项的变化(有变化急事通知我的LogAgent,实现热加载) - + //收集日志发送到kafka + taillog.Init(logEntryConf) + go etcd.Watcher(cfg.EtcdConf.Key) //2.打开日志文件准备收集日志 //err = taillog.Init(cfg.TailLog.FileName) //if err != nil { diff --git a/put_etcd/etcd_putvalue.go b/put_etcd/etcd_putvalue.go index 5254552..5024043 100644 --- a/put_etcd/etcd_putvalue.go +++ b/put_etcd/etcd_putvalue.go @@ -19,8 +19,8 @@ func main() { defer cli.Close() // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) - value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]` - _, err = cli.Put(ctx, "/xxx/", value) + value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]` + _, err = cli.Put(ctx, "/logagent/collect_log_config", value) cancel() if err != nil { fmt.Println("put to etcd failed,error:", err) diff --git a/taillog/taillog.go b/taillog/taillog.go index 9e07ee3..2a97ea6 100644 --- a/taillog/taillog.go +++ b/taillog/taillog.go @@ -3,15 +3,29 @@ package taillog import ( "fmt" "github.com/hpcloud/tail" + "logagent/kafka" ) //专门从日志文件收集日志的模块 -var ( - tailChan *tail.Tail -) +//一个日志收集的任务 -func Init(fileName string) (err error) { +type TailTask struct { + path string + topic string + instance *tail.Tail +} + +func NewTailTask(path string, topic string) (tailChan *TailTask) { + tailChan = &TailTask{ + path: path, + topic: topic, + } + tailChan.init() //根据路径去打开日志 + return +} + +func (t *TailTask) init() { config := tail.Config{ Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读 filebeat记录了文件断点的位置 ReOpen: true, //重新打开,切分用 @@ -19,13 +33,28 @@ func Init(fileName string) (err error) { Poll: true, Follow: true, //是否跟随 } - tailChan, err = tail.TailFile(fileName, config) + var err error + t.instance, err = tail.TailFile(t.path, config) if err != nil { - fmt.Printf("tail file error:%s\n", err) + fmt.Printf("tail file failed,err:%v\n", err) + return } - return + go t.run() //直接采集日志发送到kafka } -func ReadChan() <-chan *tail.Line { - return tailChan.Lines +func (t *TailTask) ReadChan() <-chan *tail.Line { + return t.instance.Lines +} + +func (t *TailTask) run() { + for { + select { + case line := <-t.ReadChan(): + fmt.Printf("日志已发送kafka,text:,%v\n", line.Text) + //kafka.SendToKafka(t.topic, line.Text) //函数调用函数 + //先把日志发送到一个通道中 + kafka.SendToChan(t.topic, line.Text) + //kafka哪个包中有一个单独的goroutine去取日志数据发送到kafka + } + } } diff --git a/taillog/tailog_mgr.go b/taillog/tailog_mgr.go new file mode 100644 index 0000000..6ef59ff --- /dev/null +++ b/taillog/tailog_mgr.go @@ -0,0 +1,48 @@ +package taillog + +import ( + "fmt" + "logagent/etcd" + "time" +) + +var tskMgr *tailLogMgr + +//tailTask管理者 + +type tailLogMgr struct { + logEntry []*etcd.LogEntry + tskMap map[string]*TailTask + newConfChan chan []*etcd.LogEntry +} + +func Init(logEntryConf []*etcd.LogEntry) { + tskMgr = &tailLogMgr{ + logEntry: logEntryConf, //把当前的日志收集项信息保存起来 + tskMap: make(map[string]*TailTask, 32), + newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道 + } + for _, logEntry := range logEntryConf { + //logEntry: *etcd.LogEntry + //要收集的日志文件的路径 + NewTailTask(logEntry.Path, logEntry.Topic) + + } + go tskMgr.run() +} + +// 监听自己的newConfChan 有了新的配置过来就做对应的除了 + +func (t *tailLogMgr) run() { + for { + select { + case newConf := <-t.newConfChan: + // 1.配置新增 + // 2.配置删除 + // 3.配置变更 + fmt.Printf("新的配置来了%v\n", newConf) + default: + time.Sleep(time.Second) + } + } +}