From a418787e0b6b2b333b16c653060b05c51ea8c52c Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 14 Sep 2024 21:11:05 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E4=B8=AA=E5=B0=86=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=94=B6=E9=9B=86=E5=8F=91=E9=80=81=E5=88=B0kafka=E7=9A=84?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka/kafka.go | 2 +- put_etcd/etcd_putvalue2.go | 2 +- put_etcd/etcd_putvalue3.go | 2 +- taillog/taillog.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/kafka.go b/kafka/kafka.go index 06a148b..bcf272c 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -54,7 +54,7 @@ func sendToKafka() { if err != nil { fmt.Println("send message err:", err) } - fmt.Printf("pid:%v offset:%v\n", pid, offset) + fmt.Printf("pid:%v offset:%v text:%v\n", pid, offset, msg.Value) default: time.Sleep(time.Millisecond * 50) } diff --git a/put_etcd/etcd_putvalue2.go b/put_etcd/etcd_putvalue2.go index 4df3164..2e15d97 100644 --- a/put_etcd/etcd_putvalue2.go +++ b/put_etcd/etcd_putvalue2.go @@ -20,7 +20,7 @@ func main() { // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"}]` - _, err = cli.Put(ctx, "/logagent/collect_log_config", value) + _, err = cli.Put(ctx, "/logagent/192.168.0.104/collect_log_config", value) cancel() if err != nil { fmt.Println("put to etcd failed,error:", err) diff --git a/put_etcd/etcd_putvalue3.go b/put_etcd/etcd_putvalue3.go index 5024043..1339080 100644 --- a/put_etcd/etcd_putvalue3.go +++ b/put_etcd/etcd_putvalue3.go @@ -20,7 +20,7 @@ func main() { // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) value := `[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]` - _, err = cli.Put(ctx, "/logagent/collect_log_config", value) + _, err = cli.Put(ctx, "/logagent/192.168.0.104/collect_log_config", value) cancel() if err != nil { fmt.Println("put to etcd failed,error:", err) diff --git a/taillog/taillog.go b/taillog/taillog.go index 28da3e5..7056e2c 100644 --- a/taillog/taillog.go +++ b/taillog/taillog.go @@ -61,7 +61,7 @@ func (t *TailTask) run() { fmt.Printf("tail task:%v exit\n", t.path+t.topic) return case line := <-t.ReadChan(): - fmt.Printf("日志已发送kafka,text:,%v\n", line.Text) + fmt.Printf("日志已发送channel,text:%v\n", line.Text) //kafka.SendToKafka(t.topic, line.Text) //函数调用函数 //先把日志发送到一个通道中 kafka.SendToChan(t.topic, line.Text)