38 lines
953 B
Go
38 lines
953 B
Go
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"github.com/Shopify/sarama"
|
||
)
|
||
|
||
// kafka sarama client
|
||
func main() {
|
||
config := sarama.NewConfig()
|
||
//tailf包使用
|
||
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
|
||
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
|
||
config.Producer.Return.Successes = true
|
||
//成功交付的消息将在success channel返回
|
||
|
||
//构造一个消息
|
||
msg := &sarama.ProducerMessage{}
|
||
msg.Topic = "web_log"
|
||
msg.Value = sarama.StringEncoder("this is test log")
|
||
|
||
//连接kafka
|
||
client, err := sarama.NewSyncProducer([]string{"152.136.226.203:9093"}, config)
|
||
if err != nil {
|
||
fmt.Println("producer closed, err:", err)
|
||
return
|
||
}
|
||
fmt.Printf("连接kafka成功!\n")
|
||
defer client.Close()
|
||
|
||
pid, offset, err := client.SendMessage(msg)
|
||
if err != nil {
|
||
fmt.Println("send message err:", err)
|
||
}
|
||
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
||
|
||
}
|