From f8dda033a4b5c98dfe10a82b1fd582a74525facb Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 25 Jul 2024 16:57:07 +0800 Subject: [PATCH] redis nsq finished --- .idea/dataSources.xml | 7 +++++ go.mod | 3 +++ go.sum | 7 +++++ nsq/nsq-consumer.go | 58 ++++++++++++++++++++++++++++++++++++++++++ nsq/nsq-producer.go | 53 ++++++++++++++++++++++++++++++++++++++ nsq/nsq.log | 7 +++++ redis/redis-connect.go | 57 +++++++++++++++++++++++++++++++++++++++++ redis/redis.log | 7 +++++ 8 files changed, 199 insertions(+) create mode 100644 nsq/nsq-consumer.go create mode 100644 nsq/nsq-producer.go create mode 100644 nsq/nsq.log create mode 100644 redis/redis-connect.go create mode 100644 redis/redis.log diff --git a/.idea/dataSources.xml b/.idea/dataSources.xml index 44706a1..51192f9 100644 --- a/.idea/dataSources.xml +++ b/.idea/dataSources.xml @@ -8,5 +8,12 @@ jdbc:mysql://43.143.245.135:3306/wangaodev $ProjectFileDir$ + + redis + true + jdbc.RedisDriver + jdbc:redis://43.143.245.135:6379/0 + $ProjectFileDir$ + \ No newline at end of file diff --git a/go.mod b/go.mod index 801b6e0..939139e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,9 @@ go 1.22 require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/jmoiron/sqlx v1.4.0 // indirect + github.com/nsqio/go-nsq v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 320e5f2..4a55919 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,15 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= diff --git a/nsq/nsq-consumer.go b/nsq/nsq-consumer.go new file mode 100644 index 0000000..cc0bd40 --- /dev/null +++ b/nsq/nsq-consumer.go @@ -0,0 +1,58 @@ +// nsq_consumer/main.go +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/nsqio/go-nsq" +) + +// NSQ Consumer Demo + +// MyHandler 是一个消费者类型 +type MyHandler struct { + Title string +} + +// HandleMessage 是需要实现的处理消息的方法 +func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { + fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) + return +} + +// 初始化消费者 +func initConsumer(topic string, channel string, address string) (err error) { + config := nsq.NewConfig() + config.LookupdPollInterval = 15 * time.Second + c, err := nsq.NewConsumer(topic, channel, config) + if err != nil { + fmt.Printf("create consumer failed, err:%v\n", err) + return + } + consumer := &MyHandler{ + Title: "自由", + } + c.AddHandler(consumer) + + //if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD + if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询 + return err + } + return nil + +} + +func main() { + err := initConsumer("topic_demo", "first", "43.143.245.135:4161") + if err != nil { + fmt.Printf("init consumer failed, err:%v\n", err) + return + } + c := make(chan os.Signal) // 定义一个信号的通道 + signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c + <-c // 阻塞 +} diff --git a/nsq/nsq-producer.go b/nsq/nsq-producer.go new file mode 100644 index 0000000..b33bb94 --- /dev/null +++ b/nsq/nsq-producer.go @@ -0,0 +1,53 @@ +// nsq_producer/main.go +package main + +import ( + "bufio" + "fmt" + "github.com/nsqio/go-nsq" + "os" + "strings" +) + +// NSQ Producer Demo + +var producer *nsq.Producer + +// 初始化生产者 +func initProducer(str string) (err error) { + config := nsq.NewConfig() + producer, err = nsq.NewProducer(str, config) + if err != nil { + fmt.Printf("create producer failed, err:%v\n", err) + return err + } + return nil +} + +func main() { + nsqAddress := "43.143.245.135:4150" + err := initProducer(nsqAddress) + if err != nil { + fmt.Printf("init producer failed, err:%v\n", err) + return + } + + reader := bufio.NewReader(os.Stdin) // 从标准输入读取 + for { + data, err := reader.ReadString('\n') + if err != nil { + fmt.Printf("read string from stdin failed, err:%v\n", err) + continue + } + data = strings.TrimSpace(data) + if strings.ToUpper(data) == "Q" { // 输入Q退出 + break + } + // 向 'topic_demo' publish 数据 + err = producer.Publish("topic_demo", []byte(data)) + if err != nil { + fmt.Printf("publish msg to nsq failed, err:%v\n", err) + continue + } + } +} diff --git a/nsq/nsq.log b/nsq/nsq.log new file mode 100644 index 0000000..7af2407 --- /dev/null +++ b/nsq/nsq.log @@ -0,0 +1,7 @@ +nsq的适用场景 +异步处理 +参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间。 +应用解耦 +通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性 +流量削峰 +类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。 \ No newline at end of file diff --git a/redis/redis-connect.go b/redis/redis-connect.go new file mode 100644 index 0000000..48c1bba --- /dev/null +++ b/redis/redis-connect.go @@ -0,0 +1,57 @@ +package main + +import ( + "fmt" + "github.com/go-redis/redis" +) + +// redis +// 与mysql类似,也是一个连接池对象 +var redisdb *redis.Client + +func initRedis() (err error) { + //这里不要声明为局部变量 + redisdb = redis.NewClient(&redis.Options{ + Addr: "43.143.245.135:6379", + Password: "ningzaichun", + DB: 0, + }) + pong, err := redisdb.Ping().Result() + if err != nil { + return err + } + fmt.Println("pong", pong) + return nil + +} +func main() { + err := initRedis() + if err != nil { + fmt.Println("connect redis failed", err) + return + } + fmt.Println("redis connect success") + + //zset + key := "rank" + languages := []redis.Z{ + {Score: 90.0, Member: "Golang"}, + {Score: 98.0, Member: "Java"}, + {Score: 95.0, Member: "Python"}, + {Score: 97.0, Member: "JavaScript"}, + {Score: 99.0, Member: "C/C++"}, + } + //把元素追加到key + num, err := redisdb.ZAdd(key, languages...).Result() + if err != nil { + fmt.Println("add languages failed", err) + } + fmt.Println("add languages success", num) + //给Golang加10分 + newScore, err := redisdb.ZIncrBy(key, 10, "Golang").Result() + if err != nil { + fmt.Printf("zincrby failed, err:%v\n", err) + return + } + fmt.Printf("Golang's score is %f now.\n", newScore) +} diff --git a/redis/redis.log b/redis/redis.log new file mode 100644 index 0000000..76f81cb --- /dev/null +++ b/redis/redis.log @@ -0,0 +1,7 @@ +redis的用处 +1.cache缓存 +2.简单的队列 +3.排行榜 +推荐书目: +《redis实战》 +