log-agent/kafka/kafka.go

44 lines
1.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
//专门往kafka写日志的模块
// 定义全局的连接
var (
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
)
// 初始化client
func Init(address []string) (err error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
fmt.Printf("连接kafka成功\n")
return
}
func SendToKafka(topic, data string) {
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder(data)
//发送到kafka
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message err:", err)
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}