log-agent/kafka/kafka.go

75 lines
1.6 KiB
Go
Raw Permalink Normal View History

2024-08-01 00:45:07 +08:00
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
2024-09-09 08:54:56 +08:00
"time"
2024-08-01 00:45:07 +08:00
)
//专门往kafka写日志的模块
2024-09-09 08:54:56 +08:00
type logData struct {
topic string
data string
}
2024-08-01 00:45:07 +08:00
// 定义全局的连接
var (
2024-09-09 08:54:56 +08:00
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
logDataChan chan *logData
2024-08-01 00:45:07 +08:00
)
// 初始化client
2024-09-09 08:54:56 +08:00
func Init(address []string, maxSize int) (err error) {
2024-08-01 00:45:07 +08:00
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
fmt.Printf("连接kafka成功\n")
2024-09-09 08:54:56 +08:00
logDataChan = make(chan *logData, maxSize)
//开启后台的goroutine从通道中取数据发送kafka
go sendToKafka()
2024-08-01 00:45:07 +08:00
return
}
2024-09-09 08:54:56 +08:00
//真正往kafka发送日志的函数
2024-08-01 00:45:07 +08:00
2024-09-09 08:54:56 +08:00
func sendToKafka() {
for {
select {
case ld := <-logDataChan:
//构造消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
//发送到kafka
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message err:", err)
}
fmt.Printf("pid:%v offset:%v text:%v\n", pid, offset, msg.Value)
2024-09-09 08:54:56 +08:00
default:
time.Sleep(time.Millisecond * 50)
}
}
}
//暴露外部的函数该函数只把日志数据发送到一个channel中
func SendToChan(topic, data string) {
msg := &logData{
topic: topic,
data: data,
2024-08-01 00:45:07 +08:00
}
2024-09-09 08:54:56 +08:00
logDataChan <- msg
2024-08-01 00:45:07 +08:00
}