2024-08-01 00:45:07 +08:00
|
|
|
|
package kafka
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/Shopify/sarama"
|
2024-09-09 08:54:56 +08:00
|
|
|
|
"time"
|
2024-08-01 00:45:07 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
//专门往kafka写日志的模块
|
|
|
|
|
|
2024-09-09 08:54:56 +08:00
|
|
|
|
type logData struct {
|
|
|
|
|
topic string
|
|
|
|
|
data string
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-01 00:45:07 +08:00
|
|
|
|
// 定义全局的连接
|
|
|
|
|
var (
|
2024-09-09 08:54:56 +08:00
|
|
|
|
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
|
|
|
|
|
logDataChan chan *logData
|
2024-08-01 00:45:07 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// 初始化client
|
2024-09-09 08:54:56 +08:00
|
|
|
|
|
|
|
|
|
func Init(address []string, maxSize int) (err error) {
|
2024-08-01 00:45:07 +08:00
|
|
|
|
config := sarama.NewConfig()
|
|
|
|
|
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
|
|
|
|
|
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
|
|
|
|
|
config.Producer.Return.Successes = true
|
|
|
|
|
client, err = sarama.NewSyncProducer(address, config)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("producer closed, err:", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("连接kafka成功!\n")
|
2024-09-09 08:54:56 +08:00
|
|
|
|
logDataChan = make(chan *logData, maxSize)
|
|
|
|
|
//开启后台的goroutine从通道中取数据发送kafka
|
|
|
|
|
go sendToKafka()
|
2024-08-01 00:45:07 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2024-09-09 08:54:56 +08:00
|
|
|
|
//真正往kafka发送日志的函数
|
2024-08-01 00:45:07 +08:00
|
|
|
|
|
2024-09-09 08:54:56 +08:00
|
|
|
|
func sendToKafka() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case ld := <-logDataChan:
|
|
|
|
|
//构造消息
|
|
|
|
|
msg := &sarama.ProducerMessage{}
|
|
|
|
|
msg.Topic = ld.topic
|
|
|
|
|
msg.Value = sarama.StringEncoder(ld.data)
|
|
|
|
|
|
|
|
|
|
//发送到kafka
|
|
|
|
|
pid, offset, err := client.SendMessage(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("send message err:", err)
|
|
|
|
|
}
|
2024-09-14 21:11:05 +08:00
|
|
|
|
fmt.Printf("pid:%v offset:%v text:%v\n", pid, offset, msg.Value)
|
2024-09-09 08:54:56 +08:00
|
|
|
|
default:
|
|
|
|
|
time.Sleep(time.Millisecond * 50)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//暴露外部的函数,该函数只把日志数据发送到一个channel中
|
|
|
|
|
|
|
|
|
|
func SendToChan(topic, data string) {
|
|
|
|
|
msg := &logData{
|
|
|
|
|
topic: topic,
|
|
|
|
|
data: data,
|
2024-08-01 00:45:07 +08:00
|
|
|
|
}
|
2024-09-09 08:54:56 +08:00
|
|
|
|
logDataChan <- msg
|
2024-08-01 00:45:07 +08:00
|
|
|
|
}
|