package main import ( "fmt" "gopkg.in/ini.v1" "logtransfer/config" "logtransfer/es" "logtransfer/kafka" ) var cfg = new(config.LogTransferCfg) //log transfer //将日志从kafka取出来发送到es func main() { //0.加载配置文件 err := ini.MapTo(cfg, "./config/config.ini") //改变全局变量的值,要传指针 if err != nil { fmt.Printf("load ini config file fail, err:%v\n", err) return } fmt.Printf("%#v\n", cfg) //1.初始化es //1.1初始化一个es链接的client err = es.InitEsConnect(cfg.EsCfg.Address, cfg.EsCfg.UserName, cfg.EsCfg.Password, cfg.EsCfg.ChanSize) if err != nil { fmt.Printf("init es connect fail, err:%v\n", err) return } //2.初始化kafka //2.1链接kafka创建分区的消费者 //2.2每个分区的消费者分别取出数据通过SendToEs()将数据发往es err = kafka.InitConsumer([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic) if err != nil { fmt.Printf("init kafka consumer fail, err:%v\n", err) return } select {} //1.从kafka取日志数据 //2.发往Es }