log-agent/taillog/taillog.go

72 lines
1.8 KiB
Go
Raw Permalink Normal View History

2024-08-01 00:45:07 +08:00
package taillog
import (
2024-09-10 20:46:31 +08:00
"context"
2024-08-01 00:45:07 +08:00
"fmt"
"github.com/hpcloud/tail"
2024-09-09 08:54:56 +08:00
"logagent/kafka"
2024-08-01 00:45:07 +08:00
)
2024-08-29 00:42:11 +08:00
//专门从日志文件收集日志的模块
2024-09-09 08:54:56 +08:00
//一个日志收集的任务
type TailTask struct {
path string
topic string
instance *tail.Tail
2024-09-10 20:46:31 +08:00
//为了实现退出t.run()
ctx context.Context
cancel context.CancelFunc
2024-09-09 08:54:56 +08:00
}
2024-08-01 00:45:07 +08:00
2024-09-09 08:54:56 +08:00
func NewTailTask(path string, topic string) (tailChan *TailTask) {
2024-09-10 20:46:31 +08:00
ctx, cancel := context.WithCancel(context.Background())
2024-09-09 08:54:56 +08:00
tailChan = &TailTask{
2024-09-10 20:46:31 +08:00
path: path,
topic: topic,
ctx: ctx,
cancel: cancel,
2024-09-09 08:54:56 +08:00
}
tailChan.init() //根据路径去打开日志
return
}
func (t *TailTask) init() {
2024-08-01 00:45:07 +08:00
config := tail.Config{
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读 filebeat记录了文件断点的位置
ReOpen: true, //重新打开,切分用
MustExist: false, //文件不存在不报错
Poll: true,
Follow: true, //是否跟随
}
2024-09-09 08:54:56 +08:00
var err error
t.instance, err = tail.TailFile(t.path, config)
2024-08-01 00:45:07 +08:00
if err != nil {
2024-09-09 08:54:56 +08:00
fmt.Printf("tail file failed,err:%v\n", err)
return
2024-08-01 00:45:07 +08:00
}
2024-09-10 20:46:31 +08:00
//当goroutine执行的函数退出的时候goroutine就结束了
2024-09-09 08:54:56 +08:00
go t.run() //直接采集日志发送到kafka
2024-08-01 00:45:07 +08:00
}
2024-08-29 00:42:11 +08:00
2024-09-09 08:54:56 +08:00
func (t *TailTask) ReadChan() <-chan *tail.Line {
return t.instance.Lines
}
func (t *TailTask) run() {
for {
select {
2024-09-10 20:46:31 +08:00
case <-t.ctx.Done():
fmt.Printf("tail task:%v exit\n", t.path+t.topic)
return
2024-09-09 08:54:56 +08:00
case line := <-t.ReadChan():
fmt.Printf("日志已发送channeltext%v\n", line.Text)
2024-09-09 08:54:56 +08:00
//kafka.SendToKafka(t.topic, line.Text) //函数调用函数
//先把日志发送到一个通道中
kafka.SendToChan(t.topic, line.Text)
//kafka哪个包中有一个单独的goroutine去取日志数据发送到kafka
}
}
2024-08-29 00:42:11 +08:00
}