commit c5d2b41ed74fc8ab3bd656da50ceb2965022e633 Author: Your Name Date: Sat Sep 14 21:09:56 2024 +0800 一个将日志发送es的程序 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/logtransfer.iml b/.idea/logtransfer.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/logtransfer.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..1eb0571 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config/cfg.go b/config/cfg.go new file mode 100644 index 0000000..153db8d --- /dev/null +++ b/config/cfg.go @@ -0,0 +1,24 @@ +package config + +//LogTransfer 全局配置文件 + +type LogTransferCfg struct { + KafkaCfg `ini:"kafka"` + EsCfg `ini:"es"` +} + +//KafkaCfg + +type KafkaCfg struct { + Address string `ini:"address"` + Topic string `ini:"topic"` +} + +//EsCfg + +type EsCfg struct { + Address string `ini:"address"` + UserName string `ini:"username"` + Password string `ini:"password"` + ChanSize int `ini:"chansize"` +} diff --git a/config/config.ini b/config/config.ini new file mode 100644 index 0000000..fb655b6 --- /dev/null +++ b/config/config.ini @@ -0,0 +1,9 @@ +[kafka] +address=43.143.245.135:9093 +topic=redis_log + +[es] +address=https://es-o3log3sa.public.tencentelasticsearch.com:9200 +username=elastic +password=wangao1996! +chansize=100000 diff --git a/es/es.go b/es/es.go new file mode 100644 index 0000000..24817c0 --- /dev/null +++ b/es/es.go @@ -0,0 +1,155 @@ +package es + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" + "strings" + "sync" + "time" +) + +type LogData struct { + Topic string `json:"topic"` + Data string `json:"data"` +} + +var ( + esClient *elasticsearch.Client + ch chan *LogData +) + +//初始化es,准备接受kafka发过来的数据 + +func InitEsConnect(address string, username string, password string, chanSize int) (err error) { + //1.初始化一个elasticsearch客户端进行链接 + cfg := elasticsearch.Config{ + Addresses: []string{address}, + Username: username, + Password: password, + } + esClient, err = elasticsearch.NewClient(cfg) + if err != nil { + fmt.Println("new es config error:", err) + return + } + fmt.Println("connect to es success!") //链接成功 + ch = make(chan *LogData, chanSize) + go sendToEs() + return +} + +//发送数据到es中 + +func SendToEsChan(msg *LogData) { + ch <- msg +} + +func sendToEs() { + var once sync.Once + for { + select { + case msg := <-ch: + once.Do(func() { + //判断是否创建了索引 + indices, _ := getAllIndices(time.Second * 5) + if !strings.Contains(indices, msg.Topic) { + fmt.Println(msg.Topic, msg.Data) + err := createIndices(msg.Topic, time.Second*5) + if err != nil { + fmt.Println("创建索引错误:", err) + return + } + } + }) + marshal, err := json.Marshal(msg) + if err != nil { + fmt.Println("marshal json err:", err) + } + //// 反射获取id + //idFiled := reflect.ValueOf(doc).FieldByName("Id") + //if !idFiled.IsValid() { + // return fmt.Errorf("invalid id field") + //} + //kind := idFiled.Kind() + //if kind != reflect.String { + // return fmt.Errorf("invalid id is not string type") + //} + response, err := esClient.Index( + msg.Topic, + // 输入流 + bytes.NewReader(marshal), + // 上下文 + esClient.Index.WithContext(context.Background()), + // 自定义Id + //cli.Index.WithDocumentID(idFiled.String()), + //esClient.Index.WithDocumentType(typeStr), + ) + fmt.Println(response.String(), err) + if err != nil { + fmt.Printf("insert document error: %s\n", err) + } + if response.StatusCode != 201 { + fmt.Printf("insert document error: %s\n", response.Status()) + } + fmt.Printf("insert document: %#v\n", msg.Data) + default: + time.Sleep(1 * time.Second) + } + } +} + +func createIndices(indices string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { + case <-ctx.Done(): + err := ctx.Err() + fmt.Println("create index timeout ") + return err + default: + withContext := esClient.Indices.Create.WithContext(ctx) + resp, err := esClient.Indices.Create(indices, withContext) + if err != nil { + fmt.Printf("create index failed, err:%v\n", err) + return err + } + + if resp.StatusCode != 200 { + fmt.Printf("create index error: %s\n", resp.Status()) + return err + } + fmt.Println("create index success") + } + return nil +} + +func getAllIndices(timeout time.Duration) (res string, err error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + select { + case <-ctx.Done(): + err := ctx.Err() + fmt.Println("get index timeout ") + return "", err + default: + resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient) + defer resp.Body.Close() + if err != nil { + fmt.Printf("get index failed, err:%v\n", err) + return "", err + } + + if resp.StatusCode != 200 { + fmt.Printf("get index error: %s\n", resp.Status()) + return "", err + } + fmt.Println("get index success") + res = resp.String() + return res, nil + } + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6fe59ac --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module logtransfer + +go 1.23.0 + +require ( + github.com/Shopify/sarama v1.19.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..205ff96 --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo= +github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/kafka/kafka.go b/kafka/kafka.go new file mode 100644 index 0000000..ce0ebd5 --- /dev/null +++ b/kafka/kafka.go @@ -0,0 +1,57 @@ +package kafka + +import ( + "fmt" + "github.com/Shopify/sarama" + "logtransfer/es" +) + +// 初始化kafka消费者,发送给es + +func InitConsumer(address []string, topic string) error { + consumer, err := sarama.NewConsumer(address, nil) + if err != nil { + fmt.Println("Error creating the consumer:", err) + return err + } + partitionList, err := consumer.Partitions(topic) + if err != nil { + fmt.Println("Error getting the list of participants:", err) + return err + } + fmt.Println("There are", partitionList) + for partition := range partitionList { + pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) + if err != nil { + fmt.Println("Error creating the partition consumer:", err) + return err + } + //defer pc.AsyncClose() + go func(sarama.PartitionConsumer) { + for msg := range pc.Messages() { + fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value) + //直接发送Es + //var ld = new(LogData) + //err := json.Unmarshal(msg.Value, ld) + //fmt.Println("ld", ld) + //fmt.Println(ld) + //if err != nil { + // fmt.Println("json unmarshal Error:", err) + //} + ld := &es.LogData{ + Topic: topic, + Data: string(msg.Value), + } + //err = es.SendToEs("lyrics", ld) + //if err != nil { + // fmt.Println("Error sending message:", err) + //} + //fmt.Println("send to es:", string(msg.Value)) + //优化:直接放到channel中 + es.SendToEsChan(ld) + } + }(pc) + + } + return nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..3aae341 --- /dev/null +++ b/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "fmt" + "gopkg.in/ini.v1" + "logtransfer/config" + "logtransfer/es" + "logtransfer/kafka" +) + +var cfg = new(config.LogTransferCfg) + +//log transfer + +//将日志从kafka取出来发送到es + +func main() { + //0.加载配置文件 + err := ini.MapTo(cfg, "./config/config.ini") //改变全局变量的值,要传指针 + if err != nil { + fmt.Printf("load ini config file fail, err:%v\n", err) + return + } + + fmt.Printf("%#v\n", cfg) + //1.初始化es + //1.1初始化一个es链接的client + err = es.InitEsConnect(cfg.EsCfg.Address, cfg.EsCfg.UserName, cfg.EsCfg.Password, cfg.EsCfg.ChanSize) + if err != nil { + fmt.Printf("init es connect fail, err:%v\n", err) + return + } + //2.初始化kafka + //2.1链接kafka创建分区的消费者 + //2.2每个分区的消费者分别取出数据通过SendToEs()将数据发往es + err = kafka.InitConsumer([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic) + if err != nil { + fmt.Printf("init kafka consumer fail, err:%v\n", err) + return + } + + select {} + //1.从kafka取日志数据 + + //2.发往Es +}