package kafka import ( "fmt" "github.com/Shopify/sarama" "time" ) //专门往kafka写日志的模块 type logData struct { topic string data string } // 定义全局的连接 var ( client sarama.SyncProducer //声明一个连接全局的kafka生产者client logDataChan chan *logData ) // 初始化client func Init(address []string, maxSize int) (err error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition config.Producer.Return.Successes = true client, err = sarama.NewSyncProducer(address, config) if err != nil { fmt.Println("producer closed, err:", err) return } fmt.Printf("连接kafka成功!\n") logDataChan = make(chan *logData, maxSize) //开启后台的goroutine从通道中取数据发送kafka go sendToKafka() return } //真正往kafka发送日志的函数 func sendToKafka() { for { select { case ld := <-logDataChan: //构造消息 msg := &sarama.ProducerMessage{} msg.Topic = ld.topic msg.Value = sarama.StringEncoder(ld.data) //发送到kafka pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message err:", err) } fmt.Printf("pid:%v offset:%v text:%v\n", pid, offset, msg.Value) default: time.Sleep(time.Millisecond * 50) } } } //暴露外部的函数,该函数只把日志数据发送到一个channel中 func SendToChan(topic, data string) { msg := &logData{ topic: topic, data: data, } logDataChan <- msg }