From b098ac47f78581e4efcb4d9c85be3a9113938995 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 30 Jul 2024 00:50:22 +0800 Subject: [PATCH] kafka introduction and tailf practice --- go.mod | 4 ++++ go.sum | 8 ++++++++ kafka/kafka.log | 28 ++++++++++++++++++++++++++++ kafka/logAgent.log | 3 +++ kafka/tailf_demo/main.go | 36 ++++++++++++++++++++++++++++++++++++ my.log | 4 ++++ 6 files changed, 83 insertions(+) create mode 100644 kafka/kafka.log create mode 100644 kafka/logAgent.log create mode 100644 kafka/tailf_demo/main.go create mode 100644 my.log diff --git a/go.mod b/go.mod index 939139e..4cfd44a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,10 @@ require ( github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/hpcloud/tail v1.0.0 // indirect github.com/jmoiron/sqlx v1.4.0 // indirect github.com/nsqio/go-nsq v1.1.0 // indirect + golang.org/x/sys v0.22.0 // indirect + gopkg.in/fsnotify.v1 v1.4.7 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) diff --git a/go.sum b/go.sum index 4a55919..a982c86 100644 --- a/go.sum +++ b/go.sum @@ -7,9 +7,17 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/kafka/kafka.log b/kafka/kafka.log new file mode 100644 index 0000000..29cb74b --- /dev/null +++ b/kafka/kafka.log @@ -0,0 +1,28 @@ +1.kafka集群的架构 + 1.broker + 2.topic + 3.partition:分区,把同一个topic分成不同的分区,提高负载 + 1.leader 分区的主节点 + 2.follower 分区的从节点 + 4.consumer group +2. 生产者往kafka发送数据的流程(6步) + 1.生产者从kafka集群中获取分区leader的信息 + 2.发送数据到leader + 3.leader数据落盘 + 4.follower从leader拉取消息数据 + 5.follower将消息写入本地磁盘后将ACK发给leader + 6.leader收到所有的follower的ACK之后向生产者发送ACK + +3.kafka选择分区的模式(3种) + 1.指定往那个分区写 + 2.指定key,kafka根据key做hash,然后决定写哪个分区 + 3.既没指定分区,又没指定key,轮询分区 +4.生产者往kakfa发送数据的模式 + 1. 0:把数据发给leader就算成功,效率最高,但安全性低 + 2. 1:把数据发送给leader,等待leader回ACK + 3. all: 把数据发给leader,确保follower从leader拉取数据回复ACK给leader,leader再回复生产者ack;安全性最高 + +5.分区存储文件的原理 +6.为什么kafka快 +7.消费数据的原理 + diff --git a/kafka/logAgent.log b/kafka/logAgent.log new file mode 100644 index 0000000..8e70732 --- /dev/null +++ b/kafka/logAgent.log @@ -0,0 +1,3 @@ +logAgent的工作流程: +1.读日志 -- tailf 第三方库 +2.往kafka里写日志 \ No newline at end of file diff --git a/kafka/tailf_demo/main.go b/kafka/tailf_demo/main.go new file mode 100644 index 0000000..bc42ff3 --- /dev/null +++ b/kafka/tailf_demo/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + "github.com/hpcloud/tail" + "time" +) + +// tail 示例 +func main() { + fileName := "./my.log" + config := tail.Config{ + Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读 filebeat记录了文件断点的位置 + ReOpen: true, //重新打开,切分用 + MustExist: false, //文件不存在不报错 + Poll: true, + Follow: true, //是否跟随 + } + tails, err := tail.TailFile(fileName, config) + if err != nil { + fmt.Printf("tail file error:%s\n", err) + } + var ( + line *tail.Line + ok bool + ) + for { + line, ok = <-tails.Lines + if !ok { + fmt.Printf("tail file closed reopen,file name:%s\n", tails.Filename) + time.Sleep(1 * time.Second) + continue + } + fmt.Println("msg:", line.Text) + } +} diff --git a/my.log b/my.log new file mode 100644 index 0000000..97cf2f5 --- /dev/null +++ b/my.log @@ -0,0 +1,4 @@ +I want to freedom +my name is wangao +hello +wasai \ No newline at end of file