logagent finished

main
Your Name 2024-09-10 20:46:31 +08:00
parent eba6d6569d
commit bbb93ac5cb
8 changed files with 130 additions and 9 deletions

View File

@ -5,4 +5,4 @@ chan_max_size=100000
[etcd]
address=43.143.245.135:2379
timeout=5
collect_log_key=/logagent/collect_log_config
collect_log_key=/logagent/%s/collect_log_config

View File

@ -57,12 +57,23 @@ func GetConf(key string) (logEntryConf []*LogEntry, err error) {
//etcd的哨兵
func Watcher(key string) {
func Watcher(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key)
for w := range ch {
for _, evt := range w.Events {
fmt.Printf("index:%v value:%v\n", evt.Type, string(evt.Kv.Value))
//通知taillog.tailTskMgr
//1.先判断操作的类型
var newConf []*LogEntry
if evt.Type != clientv3.EventTypeDelete {
err := json.Unmarshal(evt.Kv.Value, &newConf)
if err != nil {
fmt.Println("parse etcd config failed,error:", err)
continue
}
}
fmt.Println("get etcd new config success", newConf)
newConfCh <- newConf
}
}

21
main.go
View File

@ -7,6 +7,8 @@ import (
"logagent/etcd"
"logagent/kafka"
"logagent/taillog"
"logagent/utils"
"sync"
"time"
)
@ -48,20 +50,33 @@ func main() {
fmt.Printf("init etcd failed,error:%v\n", err)
}
//为了实现每个logagent都拉取自己独有的配置。所以要以自己的ip地址作为区分
ipStr, err := utils.GetBoundIP()
if err != nil {
panic(err)
}
etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
//1.从etcd中获取日志收集项的配置信息
logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
logEntryConf, err := etcd.GetConf(etcdConfKey)
if err != nil {
fmt.Printf("get etcd failed,error:%v\n", err)
return
}
fmt.Printf("get conf from etcd success,logEntryConf:%#v\n", logEntryConf)
//派一个哨兵去监视etcd配置的变化
for k, v := range logEntryConf {
fmt.Printf("index:%v,value:%v\n", k, v)
}
//收集日志发送到kafka
taillog.Init(logEntryConf)
go etcd.Watcher(cfg.EtcdConf.Key)
taillog.Init(logEntryConf) //初始化通道在这个函数中
newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道
var wg sync.WaitGroup
wg.Add(1)
go etcd.Watcher(etcdConfKey, newConfChan) //哨兵接受到最新的配置信息通知上面的通道
wg.Wait()
//2.打开日志文件准备收集日志
//err = taillog.Init(cfg.TailLog.FileName)
//if err != nil {

View File

@ -0,0 +1,28 @@
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3"
"time"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"43.143.245.135:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Println("connect etcd server failed,error:", err)
}
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]`
_, err = cli.Put(ctx, "/logagent/collect_log_config", value)
cancel()
if err != nil {
fmt.Println("put to etcd failed,error:", err)
}
}

View File

@ -1,6 +1,7 @@
package taillog
import (
"context"
"fmt"
"github.com/hpcloud/tail"
"logagent/kafka"
@ -14,12 +15,18 @@ type TailTask struct {
path string
topic string
instance *tail.Tail
//为了实现退出t.run()
ctx context.Context
cancel context.CancelFunc
}
func NewTailTask(path string, topic string) (tailChan *TailTask) {
ctx, cancel := context.WithCancel(context.Background())
tailChan = &TailTask{
path: path,
topic: topic,
path: path,
topic: topic,
ctx: ctx,
cancel: cancel,
}
tailChan.init() //根据路径去打开日志
return
@ -39,6 +46,7 @@ func (t *TailTask) init() {
fmt.Printf("tail file failed,err:%v\n", err)
return
}
//当goroutine执行的函数退出的时候goroutine就结束了
go t.run() //直接采集日志发送到kafka
}
@ -49,6 +57,9 @@ func (t *TailTask) ReadChan() <-chan *tail.Line {
func (t *TailTask) run() {
for {
select {
case <-t.ctx.Done():
fmt.Printf("tail task:%v exit\n", t.path+t.topic)
return
case line := <-t.ReadChan():
fmt.Printf("日志已发送kafkatext,%v\n", line.Text)
//kafka.SendToKafka(t.topic, line.Text) //函数调用函数

View File

@ -25,13 +25,16 @@ func Init(logEntryConf []*etcd.LogEntry) {
for _, logEntry := range logEntryConf {
//logEntry: *etcd.LogEntry
//要收集的日志文件的路径
NewTailTask(logEntry.Path, logEntry.Topic)
//初始化的时候启动了多少tailTask需要记下来
tailTask := NewTailTask(logEntry.Path, logEntry.Topic)
mKey := fmt.Sprintf("%v_%v", logEntry.Topic, logEntry.Path)
tskMgr.tskMap[mKey] = tailTask
}
go tskMgr.run()
}
// 监听自己的newConfChan 有了新的配置过来就做对应的除了
// 监听自己的newConfChan 有了新的配置过来就做对应的处理
func (t *tailLogMgr) run() {
for {
@ -40,9 +43,43 @@ func (t *tailLogMgr) run() {
// 1.配置新增
// 2.配置删除
// 3.配置变更
for _, conf := range newConf {
mKey := fmt.Sprintf("%v_%v", conf.Topic, conf.Path)
_, ok := t.tskMap[mKey]
if !ok {
//新增的
tailTask := NewTailTask(conf.Path, conf.Topic)
t.tskMap[mKey] = tailTask
} else {
//原来就有,不需要操作
continue
}
}
//找出t.tskMap有但是newConf里没有的删除
for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
isDelete := true
for _, c2 := range newConf { //去新的配置项中逐一进行比较
if c1.Path == c2.Path && c1.Topic == c2.Topic {
isDelete = false
continue
}
}
if isDelete {
//把c1对应的tailTask停掉
mKey := fmt.Sprintf("%v_%v", c1.Topic, c1.Path)
t.tskMap[mKey].cancel()
}
}
fmt.Printf("新的配置来了%v\n", newConf)
default:
time.Sleep(time.Second)
}
}
}
//向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
return tskMgr.newConfChan
}

19
utils/ip.go Normal file
View File

@ -0,0 +1,19 @@
package utils
import (
"net"
"strings"
)
//获取本机ip
func GetBoundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
ip = strings.Split(localAddr.String(), ":")[0]
return
}