38 lines
953 B
Go
38 lines
953 B
Go
|
package main
|
|||
|
|
|||
|
import (
|
|||
|
"fmt"
|
|||
|
"github.com/Shopify/sarama"
|
|||
|
)
|
|||
|
|
|||
|
// kafka sarama client
|
|||
|
func main() {
|
|||
|
config := sarama.NewConfig()
|
|||
|
//tailf包使用
|
|||
|
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
|
|||
|
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
|
|||
|
config.Producer.Return.Successes = true
|
|||
|
//成功交付的消息将在success channel返回
|
|||
|
|
|||
|
//构造一个消息
|
|||
|
msg := &sarama.ProducerMessage{}
|
|||
|
msg.Topic = "web_log"
|
|||
|
msg.Value = sarama.StringEncoder("this is test log")
|
|||
|
|
|||
|
//连接kafka
|
|||
|
client, err := sarama.NewSyncProducer([]string{"152.136.226.203:9093"}, config)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("producer closed, err:", err)
|
|||
|
return
|
|||
|
}
|
|||
|
fmt.Printf("连接kafka成功!\n")
|
|||
|
defer client.Close()
|
|||
|
|
|||
|
pid, offset, err := client.SendMessage(msg)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("send message err:", err)
|
|||
|
}
|
|||
|
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
|||
|
|
|||
|
}
|