58 lines
1.5 KiB
Go
58 lines
1.5 KiB
Go
package kafka
|
||
|
||
import (
|
||
"fmt"
|
||
"github.com/Shopify/sarama"
|
||
"logtransfer/es"
|
||
)
|
||
|
||
// 初始化kafka消费者,发送给es
|
||
|
||
func InitConsumer(address []string, topic string) error {
|
||
consumer, err := sarama.NewConsumer(address, nil)
|
||
if err != nil {
|
||
fmt.Println("Error creating the consumer:", err)
|
||
return err
|
||
}
|
||
partitionList, err := consumer.Partitions(topic)
|
||
if err != nil {
|
||
fmt.Println("Error getting the list of participants:", err)
|
||
return err
|
||
}
|
||
fmt.Println("There are", partitionList)
|
||
for partition := range partitionList {
|
||
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
|
||
if err != nil {
|
||
fmt.Println("Error creating the partition consumer:", err)
|
||
return err
|
||
}
|
||
//defer pc.AsyncClose()
|
||
go func(sarama.PartitionConsumer) {
|
||
for msg := range pc.Messages() {
|
||
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
|
||
//直接发送Es
|
||
//var ld = new(LogData)
|
||
//err := json.Unmarshal(msg.Value, ld)
|
||
//fmt.Println("ld", ld)
|
||
//fmt.Println(ld)
|
||
//if err != nil {
|
||
// fmt.Println("json unmarshal Error:", err)
|
||
//}
|
||
ld := &es.LogData{
|
||
Topic: topic,
|
||
Data: string(msg.Value),
|
||
}
|
||
//err = es.SendToEs("lyrics", ld)
|
||
//if err != nil {
|
||
// fmt.Println("Error sending message:", err)
|
||
//}
|
||
//fmt.Println("send to es:", string(msg.Value))
|
||
//优化:直接放到channel中
|
||
es.SendToEsChan(ld)
|
||
}
|
||
}(pc)
|
||
|
||
}
|
||
return nil
|
||
}
|