package kafka import ( "fmt" "github.com/Shopify/sarama" "logtransfer/es" ) // 初始化kafka消费者,发送给es func InitConsumer(address []string, topic string) error { consumer, err := sarama.NewConsumer(address, nil) if err != nil { fmt.Println("Error creating the consumer:", err) return err } partitionList, err := consumer.Partitions(topic) if err != nil { fmt.Println("Error getting the list of participants:", err) return err } fmt.Println("There are", partitionList) for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { fmt.Println("Error creating the partition consumer:", err) return err } //defer pc.AsyncClose() go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value) //直接发送Es //var ld = new(LogData) //err := json.Unmarshal(msg.Value, ld) //fmt.Println("ld", ld) //fmt.Println(ld) //if err != nil { // fmt.Println("json unmarshal Error:", err) //} ld := &es.LogData{ Topic: topic, Data: string(msg.Value), } //err = es.SendToEs("lyrics", ld) //if err != nil { // fmt.Println("Error sending message:", err) //} //fmt.Println("send to es:", string(msg.Value)) //优化:直接放到channel中 es.SendToEsChan(ld) } }(pc) } return nil }