log-agent/kafka/kafka.go

75 lines
1.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package kafka
import (
"fmt"
"github.com/Shopify/sarama"
"time"
)
//专门往kafka写日志的模块
type logData struct {
topic string
data string
}
// 定义全局的连接
var (
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
logDataChan chan *logData
)
// 初始化client
func Init(address []string, maxSize int) (err error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
fmt.Printf("连接kafka成功\n")
logDataChan = make(chan *logData, maxSize)
//开启后台的goroutine从通道中取数据发送kafka
go sendToKafka()
return
}
//真正往kafka发送日志的函数
func sendToKafka() {
for {
select {
case ld := <-logDataChan:
//构造消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
//发送到kafka
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message err:", err)
}
fmt.Printf("pid:%v offset:%v text:%v\n", pid, offset, msg.Value)
default:
time.Sleep(time.Millisecond * 50)
}
}
}
//暴露外部的函数该函数只把日志数据发送到一个channel中
func SendToChan(topic, data string) {
msg := &logData{
topic: topic,
data: data,
}
logDataChan <- msg
}