package taillog import ( "fmt" "logagent/etcd" "time" ) var tskMgr *tailLogMgr //tailTask管理者 type tailLogMgr struct { logEntry []*etcd.LogEntry tskMap map[string]*TailTask newConfChan chan []*etcd.LogEntry } func Init(logEntryConf []*etcd.LogEntry) { tskMgr = &tailLogMgr{ logEntry: logEntryConf, //把当前的日志收集项信息保存起来 tskMap: make(map[string]*TailTask, 32), newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道 } for _, logEntry := range logEntryConf { //logEntry: *etcd.LogEntry //要收集的日志文件的路径 //初始化的时候启动了多少tailTask,需要记下来 tailTask := NewTailTask(logEntry.Path, logEntry.Topic) mKey := fmt.Sprintf("%v_%v", logEntry.Topic, logEntry.Path) tskMgr.tskMap[mKey] = tailTask } go tskMgr.run() } // 监听自己的newConfChan 有了新的配置过来就做对应的处理 func (t *tailLogMgr) run() { for { select { case newConf := <-t.newConfChan: // 1.配置新增 // 2.配置删除 // 3.配置变更 for _, conf := range newConf { mKey := fmt.Sprintf("%v_%v", conf.Topic, conf.Path) _, ok := t.tskMap[mKey] if !ok { //新增的 tailTask := NewTailTask(conf.Path, conf.Topic) t.tskMap[mKey] = tailTask } else { //原来就有,不需要操作 continue } } //找出t.tskMap有,但是newConf里没有的,删除 for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项 isDelete := true for _, c2 := range newConf { //去新的配置项中逐一进行比较 if c1.Path == c2.Path && c1.Topic == c2.Topic { isDelete = false continue } } if isDelete { //把c1对应的tailTask停掉 mKey := fmt.Sprintf("%v_%v", c1.Topic, c1.Path) t.tskMap[mKey].cancel() } } fmt.Printf("新的配置来了%v\n", newConf) default: time.Sleep(time.Second) } } } //向外暴露tskMgr的newConfChan func NewConfChan() chan<- []*etcd.LogEntry { return tskMgr.newConfChan }