log-transfer/es/es.go

156 lines
3.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package es
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"strings"
"sync"
"time"
)
type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
var (
esClient *elasticsearch.Client
ch chan *LogData
)
//初始化es准备接受kafka发过来的数据
func InitEsConnect(address string, username string, password string, chanSize int) (err error) {
//1.初始化一个elasticsearch客户端进行链接
cfg := elasticsearch.Config{
Addresses: []string{address},
Username: username,
Password: password,
}
esClient, err = elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("new es config error:", err)
return
}
fmt.Println("connect to es success!") //链接成功
ch = make(chan *LogData, chanSize)
go sendToEs()
return
}
//发送数据到es中
func SendToEsChan(msg *LogData) {
ch <- msg
}
func sendToEs() {
var once sync.Once
for {
select {
case msg := <-ch:
once.Do(func() {
//判断是否创建了索引
indices, _ := getAllIndices(time.Second * 5)
if !strings.Contains(indices, msg.Topic) {
fmt.Println(msg.Topic, msg.Data)
err := createIndices(msg.Topic, time.Second*5)
if err != nil {
fmt.Println("创建索引错误:", err)
return
}
}
})
marshal, err := json.Marshal(msg)
if err != nil {
fmt.Println("marshal json err:", err)
}
//// 反射获取id
//idFiled := reflect.ValueOf(doc).FieldByName("Id")
//if !idFiled.IsValid() {
// return fmt.Errorf("invalid id field")
//}
//kind := idFiled.Kind()
//if kind != reflect.String {
// return fmt.Errorf("invalid id is not string type")
//}
response, err := esClient.Index(
msg.Topic,
// 输入流
bytes.NewReader(marshal),
// 上下文
esClient.Index.WithContext(context.Background()),
// 自定义Id
//cli.Index.WithDocumentID(idFiled.String()),
//esClient.Index.WithDocumentType(typeStr),
)
fmt.Println(response.String(), err)
if err != nil {
fmt.Printf("insert document error: %s\n", err)
}
if response.StatusCode != 201 {
fmt.Printf("insert document error: %s\n", response.Status())
}
fmt.Printf("insert document: %#v\n", msg.Data)
default:
time.Sleep(1 * time.Second)
}
}
}
func createIndices(indices string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("create index timeout ")
return err
default:
withContext := esClient.Indices.Create.WithContext(ctx)
resp, err := esClient.Indices.Create(indices, withContext)
if err != nil {
fmt.Printf("create index failed, err:%v\n", err)
return err
}
if resp.StatusCode != 200 {
fmt.Printf("create index error: %s\n", resp.Status())
return err
}
fmt.Println("create index success")
}
return nil
}
func getAllIndices(timeout time.Duration) (res string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("get index timeout ")
return "", err
default:
resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient)
defer resp.Body.Close()
if err != nil {
fmt.Printf("get index failed, err:%v\n", err)
return "", err
}
if resp.StatusCode != 200 {
fmt.Printf("get index error: %s\n", resp.Status())
return "", err
}
fmt.Println("get index success")
res = resp.String()
return res, nil
}
}