diff --git a/conf/config.ini b/conf/config.ini index 35a9ec0..939430b 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -5,4 +5,4 @@ chan_max_size=100000 [etcd] address=43.143.245.135:2379 timeout=5 -collect_log_key=/logagent/collect_log_config \ No newline at end of file +collect_log_key=/logagent/%s/collect_log_config \ No newline at end of file diff --git a/etcd/etcd.go b/etcd/etcd.go index df52562..c989903 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -57,12 +57,23 @@ func GetConf(key string) (logEntryConf []*LogEntry, err error) { //etcd的哨兵 -func Watcher(key string) { +func Watcher(key string, newConfCh chan<- []*LogEntry) { ch := cli.Watch(context.Background(), key) for w := range ch { for _, evt := range w.Events { fmt.Printf("index:%v value:%v\n", evt.Type, string(evt.Kv.Value)) //通知taillog.tailTskMgr + //1.先判断操作的类型 + var newConf []*LogEntry + if evt.Type != clientv3.EventTypeDelete { + err := json.Unmarshal(evt.Kv.Value, &newConf) + if err != nil { + fmt.Println("parse etcd config failed,error:", err) + continue + } + } + fmt.Println("get etcd new config success", newConf) + newConfCh <- newConf } } diff --git a/main.go b/main.go index 194dbe9..ba9fa85 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "logagent/etcd" "logagent/kafka" "logagent/taillog" + "logagent/utils" + "sync" "time" ) @@ -48,20 +50,33 @@ func main() { fmt.Printf("init etcd failed,error:%v\n", err) } + //为了实现每个logagent都拉取自己独有的配置。所以要以自己的ip地址作为区分 + ipStr, err := utils.GetBoundIP() + if err != nil { + panic(err) + } + etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr) //1.从etcd中获取日志收集项的配置信息 - logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key) + logEntryConf, err := etcd.GetConf(etcdConfKey) if err != nil { fmt.Printf("get etcd failed,error:%v\n", err) return } fmt.Printf("get conf from etcd success,logEntryConf:%#v\n", logEntryConf) + //派一个哨兵去监视etcd配置的变化 + for k, v := range logEntryConf { fmt.Printf("index:%v,value:%v\n", k, v) } + //收集日志发送到kafka - taillog.Init(logEntryConf) - go etcd.Watcher(cfg.EtcdConf.Key) + taillog.Init(logEntryConf) //初始化通道在这个函数中 + newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道 + var wg sync.WaitGroup + wg.Add(1) + go etcd.Watcher(etcdConfKey, newConfChan) //哨兵接受到最新的配置信息通知上面的通道 + wg.Wait() //2.打开日志文件准备收集日志 //err = taillog.Init(cfg.TailLog.FileName) //if err != nil { diff --git a/put_etcd/etcd_putvalue2.go b/put_etcd/etcd_putvalue2.go new file mode 100644 index 0000000..4df3164 --- /dev/null +++ b/put_etcd/etcd_putvalue2.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + "fmt" + "go.etcd.io/etcd/client/v3" + "time" +) + +func main() { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"43.143.245.135:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + // handle error! + fmt.Println("connect etcd server failed,error:", err) + } + defer cli.Close() + // put + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]` + _, err = cli.Put(ctx, "/logagent/collect_log_config", value) + cancel() + if err != nil { + fmt.Println("put to etcd failed,error:", err) + } +} diff --git a/put_etcd/etcd_putvalue.go b/put_etcd/etcd_putvalue3.go similarity index 100% rename from put_etcd/etcd_putvalue.go rename to put_etcd/etcd_putvalue3.go diff --git a/taillog/taillog.go b/taillog/taillog.go index 2a97ea6..28da3e5 100644 --- a/taillog/taillog.go +++ b/taillog/taillog.go @@ -1,6 +1,7 @@ package taillog import ( + "context" "fmt" "github.com/hpcloud/tail" "logagent/kafka" @@ -14,12 +15,18 @@ type TailTask struct { path string topic string instance *tail.Tail + //为了实现退出t.run() + ctx context.Context + cancel context.CancelFunc } func NewTailTask(path string, topic string) (tailChan *TailTask) { + ctx, cancel := context.WithCancel(context.Background()) tailChan = &TailTask{ - path: path, - topic: topic, + path: path, + topic: topic, + ctx: ctx, + cancel: cancel, } tailChan.init() //根据路径去打开日志 return @@ -39,6 +46,7 @@ func (t *TailTask) init() { fmt.Printf("tail file failed,err:%v\n", err) return } + //当goroutine执行的函数退出的时候,goroutine就结束了 go t.run() //直接采集日志发送到kafka } @@ -49,6 +57,9 @@ func (t *TailTask) ReadChan() <-chan *tail.Line { func (t *TailTask) run() { for { select { + case <-t.ctx.Done(): + fmt.Printf("tail task:%v exit\n", t.path+t.topic) + return case line := <-t.ReadChan(): fmt.Printf("日志已发送kafka,text:,%v\n", line.Text) //kafka.SendToKafka(t.topic, line.Text) //函数调用函数 diff --git a/taillog/tailog_mgr.go b/taillog/tailog_mgr.go index 6ef59ff..91661a7 100644 --- a/taillog/tailog_mgr.go +++ b/taillog/tailog_mgr.go @@ -25,13 +25,16 @@ func Init(logEntryConf []*etcd.LogEntry) { for _, logEntry := range logEntryConf { //logEntry: *etcd.LogEntry //要收集的日志文件的路径 - NewTailTask(logEntry.Path, logEntry.Topic) + //初始化的时候启动了多少tailTask,需要记下来 + tailTask := NewTailTask(logEntry.Path, logEntry.Topic) + mKey := fmt.Sprintf("%v_%v", logEntry.Topic, logEntry.Path) + tskMgr.tskMap[mKey] = tailTask } go tskMgr.run() } -// 监听自己的newConfChan 有了新的配置过来就做对应的除了 +// 监听自己的newConfChan 有了新的配置过来就做对应的处理 func (t *tailLogMgr) run() { for { @@ -40,9 +43,43 @@ func (t *tailLogMgr) run() { // 1.配置新增 // 2.配置删除 // 3.配置变更 + + for _, conf := range newConf { + mKey := fmt.Sprintf("%v_%v", conf.Topic, conf.Path) + _, ok := t.tskMap[mKey] + if !ok { + //新增的 + tailTask := NewTailTask(conf.Path, conf.Topic) + t.tskMap[mKey] = tailTask + } else { + //原来就有,不需要操作 + continue + } + } + //找出t.tskMap有,但是newConf里没有的,删除 + for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项 + isDelete := true + for _, c2 := range newConf { //去新的配置项中逐一进行比较 + if c1.Path == c2.Path && c1.Topic == c2.Topic { + isDelete = false + continue + } + } + if isDelete { + //把c1对应的tailTask停掉 + mKey := fmt.Sprintf("%v_%v", c1.Topic, c1.Path) + t.tskMap[mKey].cancel() + } + } fmt.Printf("新的配置来了%v\n", newConf) default: time.Sleep(time.Second) } } } + +//向外暴露tskMgr的newConfChan + +func NewConfChan() chan<- []*etcd.LogEntry { + return tskMgr.newConfChan +} diff --git a/utils/ip.go b/utils/ip.go new file mode 100644 index 0000000..2e53c2f --- /dev/null +++ b/utils/ip.go @@ -0,0 +1,19 @@ +package utils + +import ( + "net" + "strings" +) + +//获取本机ip + +func GetBoundIP() (ip string, err error) { + conn, err := net.Dial("udp", "8.8.8.8:80") + if err != nil { + return + } + defer conn.Close() + localAddr := conn.LocalAddr().(*net.UDPAddr) + ip = strings.Split(localAddr.String(), ":")[0] + return +}