kafka introduction and tailf practice

main
Your Name 2024-07-30 00:50:22 +08:00
parent 1b56fec40f
commit b098ac47f7
6 changed files with 83 additions and 0 deletions

4
go.mod
View File

@ -7,6 +7,10 @@ require (
github.com/go-redis/redis v6.15.9+incompatible // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/jmoiron/sqlx v1.4.0 // indirect
github.com/nsqio/go-nsq v1.1.0 // indirect
golang.org/x/sys v0.22.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
)

8
go.sum
View File

@ -7,9 +7,17 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=

28
kafka/kafka.log Normal file
View File

@ -0,0 +1,28 @@
1.kafka集群的架构
1.broker
2.topic
3.partition分区把同一个topic分成不同的分区提高负载
1.leader 分区的主节点
2.follower 分区的从节点
4.consumer group
2. 生产者往kafka发送数据的流程6步
1.生产者从kafka集群中获取分区leader的信息
2.发送数据到leader
3.leader数据落盘
4.follower从leader拉取消息数据
5.follower将消息写入本地磁盘后将ACK发给leader
6.leader收到所有的follower的ACK之后向生产者发送ACK
3.kafka选择分区的模式3种
1.指定往那个分区写
2.指定key,kafka根据key做hash然后决定写哪个分区
3.既没指定分区又没指定key轮询分区
4.生产者往kakfa发送数据的模式
1. 0把数据发给leader就算成功效率最高但安全性低
2. 1把数据发送给leader等待leader回ACK
3. all: 把数据发给leader确保follower从leader拉取数据回复ACK给leaderleader再回复生产者ack安全性最高
5.分区存储文件的原理
6.为什么kafka快
7.消费数据的原理

3
kafka/logAgent.log Normal file
View File

@ -0,0 +1,3 @@
logAgent的工作流程
1.读日志 -- tailf 第三方库
2.往kafka里写日志

36
kafka/tailf_demo/main.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
// tail 示例
func main() {
fileName := "./my.log"
config := tail.Config{
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读 filebeat记录了文件断点的位置
ReOpen: true, //重新打开,切分用
MustExist: false, //文件不存在不报错
Poll: true,
Follow: true, //是否跟随
}
tails, err := tail.TailFile(fileName, config)
if err != nil {
fmt.Printf("tail file error:%s\n", err)
}
var (
line *tail.Line
ok bool
)
for {
line, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file closed reopen,file name:%s\n", tails.Filename)
time.Sleep(1 * time.Second)
continue
}
fmt.Println("msg:", line.Text)
}
}

4
my.log Normal file
View File

@ -0,0 +1,4 @@
I want to freedom
my name is wangao
hello
wasai