etcd watcher
parent
656928d172
commit
eba6d6569d
|
@ -6,11 +6,13 @@ type AppConfig struct {
|
|||
}
|
||||
|
||||
type KafkaConf struct {
|
||||
Address string `ini:"address"`
|
||||
Address string `ini:"address"`
|
||||
ChanMaxSize int `ini:"chan_max_size"`
|
||||
}
|
||||
|
||||
type EtcdConf struct {
|
||||
Address string `ini:"address"`
|
||||
Key string `ini:"collect_log_key"`
|
||||
Timeout int `ini:"timeout"`
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
[kafka]
|
||||
address=43.143.245.135:9093
|
||||
|
||||
chan_max_size=100000
|
||||
|
||||
[etcd]
|
||||
address=43.143.245.135:2379
|
||||
timeout=5
|
||||
collect_log_key=/logagent/collect_log_config
|
21
etcd/etcd.go
21
etcd/etcd.go
|
@ -12,9 +12,11 @@ var (
|
|||
cli *clientv3.Client
|
||||
)
|
||||
|
||||
//需要手机的日志的配置信息
|
||||
|
||||
type LogEntry struct {
|
||||
Path string `json:"path"`
|
||||
Topic string `json:"topic"`
|
||||
Path string `json:"path"` //日志存放的路径
|
||||
Topic string `json:"topic"` //日志要发往kafka中的哪个Topic
|
||||
}
|
||||
|
||||
//初始化etcd
|
||||
|
@ -22,7 +24,7 @@ type LogEntry struct {
|
|||
func Init(addr string, timeout time.Duration) (err error) {
|
||||
cli, err = clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{addr},
|
||||
DialTimeout: 5 * time.Second,
|
||||
DialTimeout: timeout,
|
||||
})
|
||||
if err != nil {
|
||||
//handle error!
|
||||
|
@ -52,3 +54,16 @@ func GetConf(key string) (logEntryConf []*LogEntry, err error) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
//etcd的哨兵
|
||||
|
||||
func Watcher(key string) {
|
||||
ch := cli.Watch(context.Background(), key)
|
||||
for w := range ch {
|
||||
for _, evt := range w.Events {
|
||||
fmt.Printf("index:%v value:%v\n", evt.Type, string(evt.Kv.Value))
|
||||
//通知taillog.tailTskMgr
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,18 +3,25 @@ package kafka
|
|||
import (
|
||||
"fmt"
|
||||
"github.com/Shopify/sarama"
|
||||
"time"
|
||||
)
|
||||
|
||||
//专门往kafka写日志的模块
|
||||
|
||||
type logData struct {
|
||||
topic string
|
||||
data string
|
||||
}
|
||||
|
||||
// 定义全局的连接
|
||||
var (
|
||||
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
|
||||
|
||||
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
|
||||
logDataChan chan *logData
|
||||
)
|
||||
|
||||
// 初始化client
|
||||
func Init(address []string) (err error) {
|
||||
|
||||
func Init(address []string, maxSize int) (err error) {
|
||||
config := sarama.NewConfig()
|
||||
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
|
||||
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
|
||||
|
@ -25,19 +32,43 @@ func Init(address []string) (err error) {
|
|||
return
|
||||
}
|
||||
fmt.Printf("连接kafka成功!\n")
|
||||
logDataChan = make(chan *logData, maxSize)
|
||||
//开启后台的goroutine从通道中取数据发送kafka
|
||||
go sendToKafka()
|
||||
return
|
||||
}
|
||||
|
||||
func SendToKafka(topic, data string) {
|
||||
//构造一个消息
|
||||
msg := &sarama.ProducerMessage{}
|
||||
msg.Topic = topic
|
||||
msg.Value = sarama.StringEncoder(data)
|
||||
//真正往kafka发送日志的函数
|
||||
|
||||
func sendToKafka() {
|
||||
for {
|
||||
select {
|
||||
case ld := <-logDataChan:
|
||||
//构造消息
|
||||
msg := &sarama.ProducerMessage{}
|
||||
msg.Topic = ld.topic
|
||||
msg.Value = sarama.StringEncoder(ld.data)
|
||||
|
||||
//发送到kafka
|
||||
pid, offset, err := client.SendMessage(msg)
|
||||
if err != nil {
|
||||
fmt.Println("send message err:", err)
|
||||
}
|
||||
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
||||
default:
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
}
|
||||
|
||||
//发送到kafka
|
||||
pid, offset, err := client.SendMessage(msg)
|
||||
if err != nil {
|
||||
fmt.Println("send message err:", err)
|
||||
}
|
||||
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
||||
|
||||
}
|
||||
|
||||
//暴露外部的函数,该函数只把日志数据发送到一个channel中
|
||||
|
||||
func SendToChan(topic, data string) {
|
||||
msg := &logData{
|
||||
topic: topic,
|
||||
data: data,
|
||||
}
|
||||
logDataChan <- msg
|
||||
}
|
||||
|
|
11
main.go
11
main.go
|
@ -6,6 +6,7 @@ import (
|
|||
config "logagent/conf"
|
||||
"logagent/etcd"
|
||||
"logagent/kafka"
|
||||
"logagent/taillog"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -35,7 +36,7 @@ func main() {
|
|||
fmt.Println("load ini err:", err)
|
||||
}
|
||||
//1.初始化kafka连接
|
||||
err = kafka.Init([]string{cfg.KafkaConf.Address})
|
||||
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
|
||||
if err != nil {
|
||||
fmt.Printf("init kafka failed,error:%v\n", err)
|
||||
return
|
||||
|
@ -48,17 +49,19 @@ func main() {
|
|||
}
|
||||
|
||||
//1.从etcd中获取日志收集项的配置信息
|
||||
logEntryConf, err := etcd.GetConf("/xxx/")
|
||||
logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
|
||||
if err != nil {
|
||||
fmt.Printf("get etcd failed,error:%v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("get conf from etcd success,logEntryConf:%#v\n", logEntryConf)
|
||||
//派一个哨兵去监视etcd配置的变化
|
||||
for k, v := range logEntryConf {
|
||||
fmt.Printf("index:%v,value:%v\n", k, v)
|
||||
}
|
||||
//2.派一个哨兵去监视日志收集项的变化(有变化急事通知我的LogAgent,实现热加载)
|
||||
|
||||
//收集日志发送到kafka
|
||||
taillog.Init(logEntryConf)
|
||||
go etcd.Watcher(cfg.EtcdConf.Key)
|
||||
//2.打开日志文件准备收集日志
|
||||
//err = taillog.Init(cfg.TailLog.FileName)
|
||||
//if err != nil {
|
||||
|
|
|
@ -19,8 +19,8 @@ func main() {
|
|||
defer cli.Close()
|
||||
// put
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]`
|
||||
_, err = cli.Put(ctx, "/xxx/", value)
|
||||
value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]`
|
||||
_, err = cli.Put(ctx, "/logagent/collect_log_config", value)
|
||||
cancel()
|
||||
if err != nil {
|
||||
fmt.Println("put to etcd failed,error:", err)
|
||||
|
|
|
@ -3,15 +3,29 @@ package taillog
|
|||
import (
|
||||
"fmt"
|
||||
"github.com/hpcloud/tail"
|
||||
"logagent/kafka"
|
||||
)
|
||||
|
||||
//专门从日志文件收集日志的模块
|
||||
|
||||
var (
|
||||
tailChan *tail.Tail
|
||||
)
|
||||
//一个日志收集的任务
|
||||
|
||||
func Init(fileName string) (err error) {
|
||||
type TailTask struct {
|
||||
path string
|
||||
topic string
|
||||
instance *tail.Tail
|
||||
}
|
||||
|
||||
func NewTailTask(path string, topic string) (tailChan *TailTask) {
|
||||
tailChan = &TailTask{
|
||||
path: path,
|
||||
topic: topic,
|
||||
}
|
||||
tailChan.init() //根据路径去打开日志
|
||||
return
|
||||
}
|
||||
|
||||
func (t *TailTask) init() {
|
||||
config := tail.Config{
|
||||
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读 filebeat记录了文件断点的位置
|
||||
ReOpen: true, //重新打开,切分用
|
||||
|
@ -19,13 +33,28 @@ func Init(fileName string) (err error) {
|
|||
Poll: true,
|
||||
Follow: true, //是否跟随
|
||||
}
|
||||
tailChan, err = tail.TailFile(fileName, config)
|
||||
var err error
|
||||
t.instance, err = tail.TailFile(t.path, config)
|
||||
if err != nil {
|
||||
fmt.Printf("tail file error:%s\n", err)
|
||||
fmt.Printf("tail file failed,err:%v\n", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
go t.run() //直接采集日志发送到kafka
|
||||
}
|
||||
|
||||
func ReadChan() <-chan *tail.Line {
|
||||
return tailChan.Lines
|
||||
func (t *TailTask) ReadChan() <-chan *tail.Line {
|
||||
return t.instance.Lines
|
||||
}
|
||||
|
||||
func (t *TailTask) run() {
|
||||
for {
|
||||
select {
|
||||
case line := <-t.ReadChan():
|
||||
fmt.Printf("日志已发送kafka,text:,%v\n", line.Text)
|
||||
//kafka.SendToKafka(t.topic, line.Text) //函数调用函数
|
||||
//先把日志发送到一个通道中
|
||||
kafka.SendToChan(t.topic, line.Text)
|
||||
//kafka哪个包中有一个单独的goroutine去取日志数据发送到kafka
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package taillog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"logagent/etcd"
|
||||
"time"
|
||||
)
|
||||
|
||||
var tskMgr *tailLogMgr
|
||||
|
||||
//tailTask管理者
|
||||
|
||||
type tailLogMgr struct {
|
||||
logEntry []*etcd.LogEntry
|
||||
tskMap map[string]*TailTask
|
||||
newConfChan chan []*etcd.LogEntry
|
||||
}
|
||||
|
||||
func Init(logEntryConf []*etcd.LogEntry) {
|
||||
tskMgr = &tailLogMgr{
|
||||
logEntry: logEntryConf, //把当前的日志收集项信息保存起来
|
||||
tskMap: make(map[string]*TailTask, 32),
|
||||
newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
|
||||
}
|
||||
for _, logEntry := range logEntryConf {
|
||||
//logEntry: *etcd.LogEntry
|
||||
//要收集的日志文件的路径
|
||||
NewTailTask(logEntry.Path, logEntry.Topic)
|
||||
|
||||
}
|
||||
go tskMgr.run()
|
||||
}
|
||||
|
||||
// 监听自己的newConfChan 有了新的配置过来就做对应的除了
|
||||
|
||||
func (t *tailLogMgr) run() {
|
||||
for {
|
||||
select {
|
||||
case newConf := <-t.newConfChan:
|
||||
// 1.配置新增
|
||||
// 2.配置删除
|
||||
// 3.配置变更
|
||||
fmt.Printf("新的配置来了%v\n", newConf)
|
||||
default:
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue