log-agent/taillog/tailog_mgr.go

86 lines
2.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package taillog
import (
"fmt"
"logagent/etcd"
"time"
)
var tskMgr *tailLogMgr
//tailTask管理者
type tailLogMgr struct {
logEntry []*etcd.LogEntry
tskMap map[string]*TailTask
newConfChan chan []*etcd.LogEntry
}
func Init(logEntryConf []*etcd.LogEntry) {
tskMgr = &tailLogMgr{
logEntry: logEntryConf, //把当前的日志收集项信息保存起来
tskMap: make(map[string]*TailTask, 32),
newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
}
for _, logEntry := range logEntryConf {
//logEntry: *etcd.LogEntry
//要收集的日志文件的路径
//初始化的时候启动了多少tailTask需要记下来
tailTask := NewTailTask(logEntry.Path, logEntry.Topic)
mKey := fmt.Sprintf("%v_%v", logEntry.Topic, logEntry.Path)
tskMgr.tskMap[mKey] = tailTask
}
go tskMgr.run()
}
// 监听自己的newConfChan 有了新的配置过来就做对应的处理
func (t *tailLogMgr) run() {
for {
select {
case newConf := <-t.newConfChan:
// 1.配置新增
// 2.配置删除
// 3.配置变更
for _, conf := range newConf {
mKey := fmt.Sprintf("%v_%v", conf.Topic, conf.Path)
_, ok := t.tskMap[mKey]
if !ok {
//新增的
tailTask := NewTailTask(conf.Path, conf.Topic)
t.tskMap[mKey] = tailTask
} else {
//原来就有,不需要操作
continue
}
}
//找出t.tskMap有但是newConf里没有的删除
for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
isDelete := true
for _, c2 := range newConf { //去新的配置项中逐一进行比较
if c1.Path == c2.Path && c1.Topic == c2.Topic {
isDelete = false
continue
}
}
if isDelete {
//把c1对应的tailTask停掉
mKey := fmt.Sprintf("%v_%v", c1.Topic, c1.Path)
t.tskMap[mKey].cancel()
}
}
fmt.Printf("新的配置来了%v\n", newConf)
default:
time.Sleep(time.Second)
}
}
}
//向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
return tskMgr.newConfChan
}