log-transfer/kafka/kafka.go

58 lines
1.5 KiB
Go
Raw Permalink Normal View History

2024-09-14 21:09:56 +08:00
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
"logtransfer/es"
)
// 初始化kafka消费者发送给es
func InitConsumer(address []string, topic string) error {
consumer, err := sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("Error creating the consumer:", err)
return err
}
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Error getting the list of participants:", err)
return err
}
fmt.Println("There are", partitionList)
for partition := range partitionList {
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("Error creating the partition consumer:", err)
return err
}
//defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
//直接发送Es
//var ld = new(LogData)
//err := json.Unmarshal(msg.Value, ld)
//fmt.Println("ld", ld)
//fmt.Println(ld)
//if err != nil {
// fmt.Println("json unmarshal Error:", err)
//}
ld := &es.LogData{
Topic: topic,
Data: string(msg.Value),
}
//err = es.SendToEs("lyrics", ld)
//if err != nil {
// fmt.Println("Error sending message:", err)
//}
//fmt.Println("send to es:", string(msg.Value))
//优化直接放到channel中
es.SendToEsChan(ld)
}
}(pc)
}
return nil
}