一个将日志发送es的程序

main
Your Name 2024-09-14 21:09:56 +08:00
commit c5d2b41ed7
11 changed files with 358 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

9
.idea/logtransfer.iml Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/logtransfer.iml" filepath="$PROJECT_DIR$/.idea/logtransfer.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

24
config/cfg.go Normal file
View File

@ -0,0 +1,24 @@
package config
//LogTransfer 全局配置文件
type LogTransferCfg struct {
KafkaCfg `ini:"kafka"`
EsCfg `ini:"es"`
}
//KafkaCfg
type KafkaCfg struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
//EsCfg
type EsCfg struct {
Address string `ini:"address"`
UserName string `ini:"username"`
Password string `ini:"password"`
ChanSize int `ini:"chansize"`
}

9
config/config.ini Normal file
View File

@ -0,0 +1,9 @@
[kafka]
address=43.143.245.135:9093
topic=redis_log
[es]
address=https://es-o3log3sa.public.tencentelasticsearch.com:9200
username=elastic
password=wangao1996!
chansize=100000

155
es/es.go Normal file
View File

@ -0,0 +1,155 @@
package es
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"strings"
"sync"
"time"
)
type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
var (
esClient *elasticsearch.Client
ch chan *LogData
)
//初始化es准备接受kafka发过来的数据
func InitEsConnect(address string, username string, password string, chanSize int) (err error) {
//1.初始化一个elasticsearch客户端进行链接
cfg := elasticsearch.Config{
Addresses: []string{address},
Username: username,
Password: password,
}
esClient, err = elasticsearch.NewClient(cfg)
if err != nil {
fmt.Println("new es config error:", err)
return
}
fmt.Println("connect to es success!") //链接成功
ch = make(chan *LogData, chanSize)
go sendToEs()
return
}
//发送数据到es中
func SendToEsChan(msg *LogData) {
ch <- msg
}
func sendToEs() {
var once sync.Once
for {
select {
case msg := <-ch:
once.Do(func() {
//判断是否创建了索引
indices, _ := getAllIndices(time.Second * 5)
if !strings.Contains(indices, msg.Topic) {
fmt.Println(msg.Topic, msg.Data)
err := createIndices(msg.Topic, time.Second*5)
if err != nil {
fmt.Println("创建索引错误:", err)
return
}
}
})
marshal, err := json.Marshal(msg)
if err != nil {
fmt.Println("marshal json err:", err)
}
//// 反射获取id
//idFiled := reflect.ValueOf(doc).FieldByName("Id")
//if !idFiled.IsValid() {
// return fmt.Errorf("invalid id field")
//}
//kind := idFiled.Kind()
//if kind != reflect.String {
// return fmt.Errorf("invalid id is not string type")
//}
response, err := esClient.Index(
msg.Topic,
// 输入流
bytes.NewReader(marshal),
// 上下文
esClient.Index.WithContext(context.Background()),
// 自定义Id
//cli.Index.WithDocumentID(idFiled.String()),
//esClient.Index.WithDocumentType(typeStr),
)
fmt.Println(response.String(), err)
if err != nil {
fmt.Printf("insert document error: %s\n", err)
}
if response.StatusCode != 201 {
fmt.Printf("insert document error: %s\n", response.Status())
}
fmt.Printf("insert document: %#v\n", msg.Data)
default:
time.Sleep(1 * time.Second)
}
}
}
func createIndices(indices string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("create index timeout ")
return err
default:
withContext := esClient.Indices.Create.WithContext(ctx)
resp, err := esClient.Indices.Create(indices, withContext)
if err != nil {
fmt.Printf("create index failed, err:%v\n", err)
return err
}
if resp.StatusCode != 200 {
fmt.Printf("create index error: %s\n", resp.Status())
return err
}
fmt.Println("create index success")
}
return nil
}
func getAllIndices(timeout time.Duration) (res string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case <-ctx.Done():
err := ctx.Err()
fmt.Println("get index timeout ")
return "", err
default:
resp, err := esapi.CatIndicesRequest{Format: "json"}.Do(ctx, esClient)
defer resp.Body.Close()
if err != nil {
fmt.Printf("get index failed, err:%v\n", err)
return "", err
}
if resp.StatusCode != 200 {
fmt.Printf("get index error: %s\n", resp.Status())
return "", err
}
fmt.Println("get index success")
res = resp.String()
return res, nil
}
}

16
go.mod Normal file
View File

@ -0,0 +1,16 @@
module logtransfer
go 1.23.0
require (
github.com/Shopify/sarama v1.19.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.17.10 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

20
go.sum Normal file
View File

@ -0,0 +1,20 @@
github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
github.com/elastic/go-elasticsearch/v7 v7.17.10/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=

57
kafka/kafka.go Normal file
View File

@ -0,0 +1,57 @@
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
"logtransfer/es"
)
// 初始化kafka消费者发送给es
func InitConsumer(address []string, topic string) error {
consumer, err := sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("Error creating the consumer:", err)
return err
}
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Error getting the list of participants:", err)
return err
}
fmt.Println("There are", partitionList)
for partition := range partitionList {
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("Error creating the partition consumer:", err)
return err
}
//defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("partition:%d Offset:%d Key:%s Value:%s\n", msg.Partition, msg.Offset, msg.Key, msg.Value)
//直接发送Es
//var ld = new(LogData)
//err := json.Unmarshal(msg.Value, ld)
//fmt.Println("ld", ld)
//fmt.Println(ld)
//if err != nil {
// fmt.Println("json unmarshal Error:", err)
//}
ld := &es.LogData{
Topic: topic,
Data: string(msg.Value),
}
//err = es.SendToEs("lyrics", ld)
//if err != nil {
// fmt.Println("Error sending message:", err)
//}
//fmt.Println("send to es:", string(msg.Value))
//优化直接放到channel中
es.SendToEsChan(ld)
}
}(pc)
}
return nil
}

46
main.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"fmt"
"gopkg.in/ini.v1"
"logtransfer/config"
"logtransfer/es"
"logtransfer/kafka"
)
var cfg = new(config.LogTransferCfg)
//log transfer
//将日志从kafka取出来发送到es
func main() {
//0.加载配置文件
err := ini.MapTo(cfg, "./config/config.ini") //改变全局变量的值,要传指针
if err != nil {
fmt.Printf("load ini config file fail, err:%v\n", err)
return
}
fmt.Printf("%#v\n", cfg)
//1.初始化es
//1.1初始化一个es链接的client
err = es.InitEsConnect(cfg.EsCfg.Address, cfg.EsCfg.UserName, cfg.EsCfg.Password, cfg.EsCfg.ChanSize)
if err != nil {
fmt.Printf("init es connect fail, err:%v\n", err)
return
}
//2.初始化kafka
//2.1链接kafka创建分区的消费者
//2.2每个分区的消费者分别取出数据通过SendToEs()将数据发往es
err = kafka.InitConsumer([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic)
if err != nil {
fmt.Printf("init kafka consumer fail, err:%v\n", err)
return
}
select {}
//1.从kafka取日志数据
//2.发往Es
}