log-transfer/es/es.go

156 lines
3.6 KiB
Go
Raw Permalink Normal View History

2024-09-14 21:09:56 +08:00
package es
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"strings"
"sync"
"time"
)
type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
var (
esClient *elasticsearch.Client
ch chan *LogData
)
//初始化es准备接受kafka发过来的数据
func InitEsConnect(address string, username string, password string, chanSize int) (err error) {
//1.初始化一个elasticsearch客户端进行链接
cfg := elasticsearch.Config{
Addresses: []string{address},
Username: username,
Password: password,
}
esClient, err = elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("new es config error:", err)
return
}
fmt.Println("connect to es success!") //链接成功
ch = make(chan *LogData, chanSize)
go sendToEs()
return
}
//发送数据到es中
func SendToEsChan(msg *LogData) {
ch <- msg
}
func sendToEs() {
var once sync.Once
for {
select {
case msg := <-ch:
once.Do(func() {
//判断是否创建了索引
indices, _ := getAllIndices(time.Second * 5)
if !strings.Contains(indices, msg.Topic) {
fmt.Println(msg.Topic, msg.Data)
err := createIndices(msg.Topic, time.Second*5)
if err != nil {
fmt.Println("创建索引错误:", err)
return
}
}
})
marshal, err := json.Marshal(msg)
if err != nil {
fmt.Println("marshal json err:", err)
}
//// 反射获取id
//idFiled := reflect.ValueOf(doc).FieldByName("Id")
//if !idFiled.IsValid() {
// return fmt.Errorf("invalid id field")
//}
//kind := idFiled.Kind()
//if kind != reflect.String {
// return fmt.Errorf("invalid id is not string type")
//}
response, err := esClient.Index(
msg.Topic,
// 输入流
bytes.NewReader(marshal),
// 上下文
esClient.Index.WithContext(context.Background()),
// 自定义Id
//cli.Index.WithDocumentID(idFiled.String()),
//esClient.Index.WithDocumentType(typeStr),
)
fmt.Println(response.String(), err)
if err != nil {
fmt.Printf("insert document error: %s\n", err)
}
if response.StatusCode != 201 {
fmt.Printf("insert document error: %s\n", response.Status())
}
fmt.Printf("insert document: %#v\n", msg.Data)
default:
time.Sleep(1 * time.Second)
}
}
}
func createIndices(indices string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("create index timeout ")
return err
default:
withContext := esClient.Indices.Create.WithContext(ctx)
resp, err := esClient.Indices.Create(indices, withContext)
if err != nil {
fmt.Printf("create index failed, err:%v\n", err)
return err
}
if resp.StatusCode != 200 {
fmt.Printf("create index error: %s\n", resp.Status())
return err
}
fmt.Println("create index success")
}
return nil
}
func getAllIndices(timeout time.Duration) (res string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("get index timeout ")
return "", err
default:
resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient)
defer resp.Body.Close()
if err != nil {
fmt.Printf("get index failed, err:%v\n", err)
return "", err
}
if resp.StatusCode != 200 {
fmt.Printf("get index error: %s\n", resp.Status())
return "", err
}
fmt.Println("get index success")
res = resp.String()
return res, nil
}
}