package es import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "strings" "sync" "time" ) type LogData struct { Topic string `json:"topic"` Data string `json:"data"` } var ( esClient *elasticsearch.Client ch chan *LogData ) //初始化es,准备接受kafka发过来的数据 func InitEsConnect(address string, username string, password string, chanSize int) (err error) { //1.初始化一个elasticsearch客户端进行链接 cfg := elasticsearch.Config{ Addresses: []string{address}, Username: username, Password: password, } esClient, err = elasticsearch.NewClient(cfg) if err != nil { fmt.Println("new es config error:", err) return } fmt.Println("connect to es success!") //链接成功 ch = make(chan *LogData, chanSize) go sendToEs() return } //发送数据到es中 func SendToEsChan(msg *LogData) { ch <- msg } func sendToEs() { var once sync.Once for { select { case msg := <-ch: once.Do(func() { //判断是否创建了索引 indices, _ := getAllIndices(time.Second * 5) if !strings.Contains(indices, msg.Topic) { fmt.Println(msg.Topic, msg.Data) err := createIndices(msg.Topic, time.Second*5) if err != nil { fmt.Println("创建索引错误:", err) return } } }) marshal, err := json.Marshal(msg) if err != nil { fmt.Println("marshal json err:", err) } //// 反射获取id //idFiled := reflect.ValueOf(doc).FieldByName("Id") //if !idFiled.IsValid() { // return fmt.Errorf("invalid id field") //} //kind := idFiled.Kind() //if kind != reflect.String { // return fmt.Errorf("invalid id is not string type") //} response, err := esClient.Index( msg.Topic, // 输入流 bytes.NewReader(marshal), // 上下文 esClient.Index.WithContext(context.Background()), // 自定义Id //cli.Index.WithDocumentID(idFiled.String()), //esClient.Index.WithDocumentType(typeStr), ) fmt.Println(response.String(), err) if err != nil { fmt.Printf("insert document error: %s\n", err) } if response.StatusCode != 201 { fmt.Printf("insert document error: %s\n", response.Status()) } fmt.Printf("insert document: %#v\n", msg.Data) default: time.Sleep(1 * time.Second) } } } func createIndices(indices string, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case <-ctx.Done(): err := ctx.Err() fmt.Println("create index timeout ") return err default: withContext := esClient.Indices.Create.WithContext(ctx) resp, err := esClient.Indices.Create(indices, withContext) if err != nil { fmt.Printf("create index failed, err:%v\n", err) return err } if resp.StatusCode != 200 { fmt.Printf("create index error: %s\n", resp.Status()) return err } fmt.Println("create index success") } return nil } func getAllIndices(timeout time.Duration) (res string, err error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case <-ctx.Done(): err := ctx.Err() fmt.Println("get index timeout ") return "", err default: resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient) defer resp.Body.Close() if err != nil { fmt.Printf("get index failed, err:%v\n", err) return "", err } if resp.StatusCode != 200 { fmt.Printf("get index error: %s\n", resp.Status()) return "", err } fmt.Println("get index success") res = resp.String() return res, nil } }