course/kafka/sarama/main.go

38 lines
953 B
Go
Raw Normal View History

2024-08-01 00:43:19 +08:00
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// kafka sarama client
func main() {
config := sarama.NewConfig()
//tailf包使用
config.Producer.RequiredAcks = sarama.WaitForAll //发送完包需要 leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true
//成功交付的消息将在success channel返回
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is test log")
//连接kafka
client, err := sarama.NewSyncProducer([]string{"152.136.226.203:9093"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
fmt.Printf("连接kafka成功\n")
defer client.Close()
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message err:", err)
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}