From 656928d1724b191524f9e45b9568a51b626d4696 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 3 Sep 2024 00:13:58 +0800 Subject: [PATCH] etcd collect conf --- etcd/etcd.go | 27 +++++++++++++++++++++++++++ main.go | 13 +++++++++++++ put_etcd/etcd_putvalue.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 put_etcd/etcd_putvalue.go diff --git a/etcd/etcd.go b/etcd/etcd.go index d32ce28..3a0caa6 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -1,6 +1,8 @@ package etcd import ( + "context" + "encoding/json" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "time" @@ -10,6 +12,11 @@ var ( cli *clientv3.Client ) +type LogEntry struct { + Path string `json:"path"` + Topic string `json:"topic"` +} + //初始化etcd func Init(addr string, timeout time.Duration) (err error) { @@ -25,3 +32,23 @@ func Init(addr string, timeout time.Duration) (err error) { fmt.Println("connect etcd server success") return } + +// 从etcd中根据key获取配置项 + +func GetConf(key string) (logEntryConf []*LogEntry, err error) { + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + resp, err := cli.Get(ctx, key) + cancel() + if err != nil { + fmt.Println("get etcd config failed,error:", err) + } + for _, ev := range resp.Kvs { + err = json.Unmarshal(ev.Value, &logEntryConf) + if err != nil { + fmt.Println("parse etcd config failed,error:", err) + return + } + } + return +} diff --git a/main.go b/main.go index 2a32b9a..41ff732 100644 --- a/main.go +++ b/main.go @@ -46,6 +46,19 @@ func main() { if err != nil { fmt.Printf("init etcd failed,error:%v\n", err) } + + //1.从etcd中获取日志收集项的配置信息 + logEntryConf, err := etcd.GetConf("/xxx/") + if err != nil { + fmt.Printf("get etcd failed,error:%v\n", err) + return + } + fmt.Printf("get conf from etcd success,logEntryConf:%#v\n", logEntryConf) + for k, v := range logEntryConf { + fmt.Printf("index:%v,value:%v\n", k, v) + } + //2.派一个哨兵去监视日志收集项的变化(有变化急事通知我的LogAgent,实现热加载) + //2.打开日志文件准备收集日志 //err = taillog.Init(cfg.TailLog.FileName) //if err != nil { diff --git a/put_etcd/etcd_putvalue.go b/put_etcd/etcd_putvalue.go new file mode 100644 index 0000000..5254552 --- /dev/null +++ b/put_etcd/etcd_putvalue.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + "fmt" + "go.etcd.io/etcd/client/v3" + "time" +) + +func main() { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"43.143.245.135:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + // handle error! + fmt.Println("connect etcd server failed,error:", err) + } + defer cli.Close() + // put + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]` + _, err = cli.Put(ctx, "/xxx/", value) + cancel() + if err != nil { + fmt.Println("put to etcd failed,error:", err) + } +}