44 lines
1.0 KiB
Go
44 lines
1.0 KiB
Go
|
package kafka
|
|||
|
|
|||
|
import (
|
|||
|
"fmt"
|
|||
|
"github.com/Shopify/sarama"
|
|||
|
)
|
|||
|
|
|||
|
//专门往kafka写日志的模块
|
|||
|
|
|||
|
// 定义全局的连接
|
|||
|
var (
|
|||
|
client sarama.SyncProducer //声明一个连接全局的kafka生产者client
|
|||
|
|
|||
|
)
|
|||
|
|
|||
|
// 初始化client
|
|||
|
func Init(address []string) (err error) {
|
|||
|
config := sarama.NewConfig()
|
|||
|
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
|
|||
|
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
|
|||
|
config.Producer.Return.Successes = true
|
|||
|
client, err = sarama.NewSyncProducer(address, config)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("producer closed, err:", err)
|
|||
|
return
|
|||
|
}
|
|||
|
fmt.Printf("连接kafka成功!\n")
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func SendToKafka(topic string, data string) {
|
|||
|
//构造一个消息
|
|||
|
msg := &sarama.ProducerMessage{}
|
|||
|
msg.Topic = topic
|
|||
|
msg.Value = sarama.StringEncoder(data)
|
|||
|
|
|||
|
//发送kafka
|
|||
|
pid, offset, err := client.SendMessage(msg)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("send message err:", err)
|
|||
|
}
|
|||
|
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
|||
|
}
|