156 lines
3.6 KiB
Go
156 lines
3.6 KiB
Go
package es
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/elastic/go-elasticsearch/v7"
|
||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type LogData struct {
|
||
Topic string `json:"topic"`
|
||
Data string `json:"data"`
|
||
}
|
||
|
||
var (
|
||
esClient *elasticsearch.Client
|
||
ch chan *LogData
|
||
)
|
||
|
||
//初始化es,准备接受kafka发过来的数据
|
||
|
||
func InitEsConnect(address string, username string, password string, chanSize int) (err error) {
|
||
//1.初始化一个elasticsearch客户端进行链接
|
||
cfg := elasticsearch.Config{
|
||
Addresses: []string{address},
|
||
Username: username,
|
||
Password: password,
|
||
}
|
||
esClient, err = elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
fmt.Println("new es config error:", err)
|
||
return
|
||
}
|
||
fmt.Println("connect to es success!") //链接成功
|
||
ch = make(chan *LogData, chanSize)
|
||
go sendToEs()
|
||
return
|
||
}
|
||
|
||
//发送数据到es中
|
||
|
||
func SendToEsChan(msg *LogData) {
|
||
ch <- msg
|
||
}
|
||
|
||
func sendToEs() {
|
||
var once sync.Once
|
||
for {
|
||
select {
|
||
case msg := <-ch:
|
||
once.Do(func() {
|
||
//判断是否创建了索引
|
||
indices, _ := getAllIndices(time.Second * 5)
|
||
if !strings.Contains(indices, msg.Topic) {
|
||
fmt.Println(msg.Topic, msg.Data)
|
||
err := createIndices(msg.Topic, time.Second*5)
|
||
if err != nil {
|
||
fmt.Println("创建索引错误:", err)
|
||
return
|
||
}
|
||
}
|
||
})
|
||
marshal, err := json.Marshal(msg)
|
||
if err != nil {
|
||
fmt.Println("marshal json err:", err)
|
||
}
|
||
//// 反射获取id
|
||
//idFiled := reflect.ValueOf(doc).FieldByName("Id")
|
||
//if !idFiled.IsValid() {
|
||
// return fmt.Errorf("invalid id field")
|
||
//}
|
||
//kind := idFiled.Kind()
|
||
//if kind != reflect.String {
|
||
// return fmt.Errorf("invalid id is not string type")
|
||
//}
|
||
response, err := esClient.Index(
|
||
msg.Topic,
|
||
// 输入流
|
||
bytes.NewReader(marshal),
|
||
// 上下文
|
||
esClient.Index.WithContext(context.Background()),
|
||
// 自定义Id
|
||
//cli.Index.WithDocumentID(idFiled.String()),
|
||
//esClient.Index.WithDocumentType(typeStr),
|
||
)
|
||
fmt.Println(response.String(), err)
|
||
if err != nil {
|
||
fmt.Printf("insert document error: %s\n", err)
|
||
}
|
||
if response.StatusCode != 201 {
|
||
fmt.Printf("insert document error: %s\n", response.Status())
|
||
}
|
||
fmt.Printf("insert document: %#v\n", msg.Data)
|
||
default:
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
}
|
||
}
|
||
|
||
func createIndices(indices string, timeout time.Duration) error {
|
||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
defer cancel()
|
||
select {
|
||
case <-ctx.Done():
|
||
err := ctx.Err()
|
||
fmt.Println("create index timeout ")
|
||
return err
|
||
default:
|
||
withContext := esClient.Indices.Create.WithContext(ctx)
|
||
resp, err := esClient.Indices.Create(indices, withContext)
|
||
if err != nil {
|
||
fmt.Printf("create index failed, err:%v\n", err)
|
||
return err
|
||
}
|
||
|
||
if resp.StatusCode != 200 {
|
||
fmt.Printf("create index error: %s\n", resp.Status())
|
||
return err
|
||
}
|
||
fmt.Println("create index success")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func getAllIndices(timeout time.Duration) (res string, err error) {
|
||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
defer cancel()
|
||
select {
|
||
case <-ctx.Done():
|
||
err := ctx.Err()
|
||
fmt.Println("get index timeout ")
|
||
return "", err
|
||
default:
|
||
resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient)
|
||
defer resp.Body.Close()
|
||
if err != nil {
|
||
fmt.Printf("get index failed, err:%v\n", err)
|
||
return "", err
|
||
}
|
||
|
||
if resp.StatusCode != 200 {
|
||
fmt.Printf("get index error: %s\n", resp.Status())
|
||
return "", err
|
||
}
|
||
fmt.Println("get index success")
|
||
res = resp.String()
|
||
return res, nil
|
||
}
|
||
|
||
}
|