// nsq_consumer/main.go package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq" ) // NSQ Consumer Demo // MyHandler 是一个消费者类型 type MyHandler struct { Title string } // HandleMessage 是需要实现的处理消息的方法 func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return } // 初始化消费者 func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } consumer := &MyHandler{ Title: "自由", } c.AddHandler(consumer) //if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询 return err } return nil } func main() { err := initConsumer("topic_demo", "first", "43.143.245.135:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定义一个信号的通道 signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c <-c // 阻塞 }