156 lines
3.6 KiB
Go
156 lines
3.6 KiB
Go
|
package es
|
|||
|
|
|||
|
import (
|
|||
|
"bytes"
|
|||
|
"context"
|
|||
|
"encoding/json"
|
|||
|
"fmt"
|
|||
|
"github.com/elastic/go-elasticsearch/v7"
|
|||
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|||
|
"strings"
|
|||
|
"sync"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
type LogData struct {
|
|||
|
Topic string `json:"topic"`
|
|||
|
Data string `json:"data"`
|
|||
|
}
|
|||
|
|
|||
|
var (
|
|||
|
esClient *elasticsearch.Client
|
|||
|
ch chan *LogData
|
|||
|
)
|
|||
|
|
|||
|
//初始化es,准备接受kafka发过来的数据
|
|||
|
|
|||
|
func InitEsConnect(address string, username string, password string, chanSize int) (err error) {
|
|||
|
//1.初始化一个elasticsearch客户端进行链接
|
|||
|
cfg := elasticsearch.Config{
|
|||
|
Addresses: []string{address},
|
|||
|
Username: username,
|
|||
|
Password: password,
|
|||
|
}
|
|||
|
esClient, err = elasticsearch.NewClient(cfg)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("new es config error:", err)
|
|||
|
return
|
|||
|
}
|
|||
|
fmt.Println("connect to es success!") //链接成功
|
|||
|
ch = make(chan *LogData, chanSize)
|
|||
|
go sendToEs()
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
//发送数据到es中
|
|||
|
|
|||
|
func SendToEsChan(msg *LogData) {
|
|||
|
ch <- msg
|
|||
|
}
|
|||
|
|
|||
|
func sendToEs() {
|
|||
|
var once sync.Once
|
|||
|
for {
|
|||
|
select {
|
|||
|
case msg := <-ch:
|
|||
|
once.Do(func() {
|
|||
|
//判断是否创建了索引
|
|||
|
indices, _ := getAllIndices(time.Second * 5)
|
|||
|
if !strings.Contains(indices, msg.Topic) {
|
|||
|
fmt.Println(msg.Topic, msg.Data)
|
|||
|
err := createIndices(msg.Topic, time.Second*5)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("创建索引错误:", err)
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
})
|
|||
|
marshal, err := json.Marshal(msg)
|
|||
|
if err != nil {
|
|||
|
fmt.Println("marshal json err:", err)
|
|||
|
}
|
|||
|
//// 反射获取id
|
|||
|
//idFiled := reflect.ValueOf(doc).FieldByName("Id")
|
|||
|
//if !idFiled.IsValid() {
|
|||
|
// return fmt.Errorf("invalid id field")
|
|||
|
//}
|
|||
|
//kind := idFiled.Kind()
|
|||
|
//if kind != reflect.String {
|
|||
|
// return fmt.Errorf("invalid id is not string type")
|
|||
|
//}
|
|||
|
response, err := esClient.Index(
|
|||
|
msg.Topic,
|
|||
|
// 输入流
|
|||
|
bytes.NewReader(marshal),
|
|||
|
// 上下文
|
|||
|
esClient.Index.WithContext(context.Background()),
|
|||
|
// 自定义Id
|
|||
|
//cli.Index.WithDocumentID(idFiled.String()),
|
|||
|
//esClient.Index.WithDocumentType(typeStr),
|
|||
|
)
|
|||
|
fmt.Println(response.String(), err)
|
|||
|
if err != nil {
|
|||
|
fmt.Printf("insert document error: %s\n", err)
|
|||
|
}
|
|||
|
if response.StatusCode != 201 {
|
|||
|
fmt.Printf("insert document error: %s\n", response.Status())
|
|||
|
}
|
|||
|
fmt.Printf("insert document: %#v\n", msg.Data)
|
|||
|
default:
|
|||
|
time.Sleep(1 * time.Second)
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func createIndices(indices string, timeout time.Duration) error {
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|||
|
defer cancel()
|
|||
|
select {
|
|||
|
case <-ctx.Done():
|
|||
|
err := ctx.Err()
|
|||
|
fmt.Println("create index timeout ")
|
|||
|
return err
|
|||
|
default:
|
|||
|
withContext := esClient.Indices.Create.WithContext(ctx)
|
|||
|
resp, err := esClient.Indices.Create(indices, withContext)
|
|||
|
if err != nil {
|
|||
|
fmt.Printf("create index failed, err:%v\n", err)
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if resp.StatusCode != 200 {
|
|||
|
fmt.Printf("create index error: %s\n", resp.Status())
|
|||
|
return err
|
|||
|
}
|
|||
|
fmt.Println("create index success")
|
|||
|
}
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
func getAllIndices(timeout time.Duration) (res string, err error) {
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|||
|
defer cancel()
|
|||
|
select {
|
|||
|
case <-ctx.Done():
|
|||
|
err := ctx.Err()
|
|||
|
fmt.Println("get index timeout ")
|
|||
|
return "", err
|
|||
|
default:
|
|||
|
resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient)
|
|||
|
defer resp.Body.Close()
|
|||
|
if err != nil {
|
|||
|
fmt.Printf("get index failed, err:%v\n", err)
|
|||
|
return "", err
|
|||
|
}
|
|||
|
|
|||
|
if resp.StatusCode != 200 {
|
|||
|
fmt.Printf("get index error: %s\n", resp.Status())
|
|||
|
return "", err
|
|||
|
}
|
|||
|
fmt.Println("get index success")
|
|||
|
res = resp.String()
|
|||
|
return res, nil
|
|||
|
}
|
|||
|
|
|||
|
}
|