58 lines
1.5 KiB
Go
58 lines
1.5 KiB
Go
|
package kafka
|
|||
|
|
|||
|
import (
|
|||
|
"fmt"
|
|||
|
"github.com/Shopify/sarama"
|
|||
|
"logtransfer/es"
|
|||
|
)
|
|||
|
|
|||
|
// 初始化kafka消费者,发送给es
|
|||
|
|
|||
|
func InitConsumer(address []string, topic string) error {
|
|||
|
consumer, err := sarama.NewConsumer(address, nil)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("Error creating the consumer:", err)
|
|||
|
return err
|
|||
|
}
|
|||
|
partitionList, err := consumer.Partitions(topic)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("Error getting the list of participants:", err)
|
|||
|
return err
|
|||
|
}
|
|||
|
fmt.Println("There are", partitionList)
|
|||
|
for partition := range partitionList {
|
|||
|
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("Error creating the partition consumer:", err)
|
|||
|
return err
|
|||
|
}
|
|||
|
//defer pc.AsyncClose()
|
|||
|
go func(sarama.PartitionConsumer) {
|
|||
|
for msg := range pc.Messages() {
|
|||
|
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
|
|||
|
//直接发送Es
|
|||
|
//var ld = new(LogData)
|
|||
|
//err := json.Unmarshal(msg.Value, ld)
|
|||
|
//fmt.Println("ld", ld)
|
|||
|
//fmt.Println(ld)
|
|||
|
//if err != nil {
|
|||
|
// fmt.Println("json unmarshal Error:", err)
|
|||
|
//}
|
|||
|
ld := &es.LogData{
|
|||
|
Topic: topic,
|
|||
|
Data: string(msg.Value),
|
|||
|
}
|
|||
|
//err = es.SendToEs("lyrics", ld)
|
|||
|
//if err != nil {
|
|||
|
// fmt.Println("Error sending message:", err)
|
|||
|
//}
|
|||
|
//fmt.Println("send to es:", string(msg.Value))
|
|||
|
//优化:直接放到channel中
|
|||
|
es.SendToEsChan(ld)
|
|||
|
}
|
|||
|
}(pc)
|
|||
|
|
|||
|
}
|
|||
|
return nil
|
|||
|
}
|