RabbitMQ 消息队列
#
概览消息队列也是框架的基本标配,实际开发中也基本上离不开消息队列的使用,比如:及时队列、延迟队列、定时队列。
使用场景如:
- 新用户注册发送欢迎提醒(即时消息)
- 网上购物下订单,30分钟内未支付订单会被关闭(延迟消息)
- 在指定的时间运行任务(定时消息)
在 Eagle 框架中,分为两类消息队列,一种是偏轻量型的消息队列(主要使用redis),一种重量型一点的消息队列(RabbitMQ或Kafak),下面详细介绍, 本文主要介绍 RabbitMQ
消息队列。
#
RabbitMQ 消息队列#
特性- 支持多队列配置
- 支持延迟队列配置
#
配置# rabbitmq.yaml, 支持配置多种队列test-demo: URI: "amqp://guest:guest@localhost:5672/" AutoDeclare: true Timeout: 5s Exchange: Name: local-test-exchange Kind: direct Durable: true AutoDelete: false Internal: false NoWait: false Args: {} Queue: name: local-test-queue durable: true AutoDelete: false Exclusive: false NoWait: false Args: {} Bind: RoutingKey: local-test-routing-key NoWait: false Args: {}
test-multi: URI: "amqp://guest:guest@localhost:5672/" AutoDeclare: true Timeout: 5s Exchange: Name: local-test2-exchange Kind: direct Durable: true AutoDelete: false Internal: false NoWait: false Args: {} Queue: name: local-test2-queue durable: true AutoDelete: false Exclusive: false NoWait: false Args: {} Bind: RoutingKey: local-test2-routing-key NoWait: false Args: {}
#
使用#
生产者package main
import ( "context" "encoding/json" "log" "time"
"github.com/go-eagle/eagle/pkg/queue/rabbitmq/options"
eagle "github.com/go-eagle/eagle/pkg/app" "github.com/go-eagle/eagle/pkg/config" logger "github.com/go-eagle/eagle/pkg/log" "github.com/spf13/pflag"
"github.com/go-eagle/eagle/pkg/queue/rabbitmq")
var ( cfgDir = pflag.StringP("config dir", "c", "config", "config path.") env = pflag.StringP("env name", "e", "", "env var name."))
// 启动 rabbitmq// docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management// 访问ui: http://127.0.0.1:15672/// cd examples/queue/rabbitmq/producer// go run main.gofunc main() { c := config.New(*cfgDir, config.WithEnv(*env)) var cfg eagle.Config if err := c.Load("app", &cfg); err != nil { panic(err) } // set global eagle.Conf = &cfg
logger.Init()
rabbitmq.Load() defer rabbitmq.Close()
opts := []options.PublishOption{ options.WithPublishOptionContentType("application/json"), }
go func() { var message string for i := 0; i < 100000; i++ { message = "Hello World RabbitMQ!" + time.Now().String() msg := map[string]interface{}{ "message": message, } data, _ := json.Marshal(msg) if err := rabbitmq.Publish(context.Background(), "test-demo", data, opts...); err != nil { log.Fatalf("failed publish message: %s", err.Error()) } } }()
var message string for i := 0; i < 100000; i++ { message = "Hello World multi RabbitMQ!" + time.Now().String() msg := map[string]interface{}{ "message": message, } data, _ := json.Marshal(msg) if err := rabbitmq.Publish(context.Background(), "test-multi", data, opts...); err != nil { log.Fatalf("failed publish message: %s", err.Error()) } }
}
src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/producer/main.go
#
消费者package main
import ( "context" "encoding/json" "os" "os/signal" "syscall"
"github.com/go-eagle/eagle/pkg/queue/rabbitmq/options"
"github.com/rabbitmq/amqp091-go"
eagle "github.com/go-eagle/eagle/pkg/app" "github.com/go-eagle/eagle/pkg/config"
"github.com/spf13/pflag"
logger "github.com/go-eagle/eagle/pkg/log"
"github.com/go-eagle/eagle/pkg/queue/rabbitmq")
var ( cfgDir = pflag.StringP("config dir", "c", "config", "config path.") env = pflag.StringP("env name", "e", "", "env var name."))
// cd examples/queue/rabbitmq/consumer/// go run main.gofunc main() { pflag.Parse()
// init config c := config.New(*cfgDir, config.WithEnv(*env)) var cfg eagle.Config if err := c.Load("app", &cfg); err != nil { panic(err) } // set global eagle.Conf = &cfg
logger.Init()
rabbitmq.Load() defer rabbitmq.Close()
stopSig := make(chan os.Signal, 1) signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM)
done := make(chan struct{}) stop := make(chan struct{}, 1)
// 自定义消息处理函数 handler := func(ctx context.Context, body amqp091.Delivery) (action rabbitmq.Action) { msg := make(map[string]interface{}) err := json.Unmarshal(body.Body, &msg) if err != nil { logger.Errorf("consumer handler unmarshal msg err: %s", err.Error()) return rabbitmq.NackDiscard } logger.Infof("consumer handler receive msg: %s", msg) return rabbitmq.Ack }
// rabbitmq consume message ctx := context.Background()
opts := []options.ConsumerOption{ options.WithConsumerOptionConcurrency(1), }
go func() { err := rabbitmq.Consume(ctx, "test-demo", handler, opts...) if err != nil { logger.Errorf("rabbitmq consume err: %s", err.Error()) } }()
for { select { case <-stopSig: logger.Info("received stop signal") stop <- struct{}{} case <-stop: logger.Info("stopping service") close(done) return case <-done: logger.Info("stopped service gracefully") return } }}
src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/consumer/main.go
#
延迟队列package main
import ( "context" "encoding/json" "log" "time"
eagle "github.com/go-eagle/eagle/pkg/app" "github.com/go-eagle/eagle/pkg/config" logger "github.com/go-eagle/eagle/pkg/log" "github.com/go-eagle/eagle/pkg/queue/rabbitmq" "github.com/go-eagle/eagle/pkg/queue/rabbitmq/options")
// 启动 rabbitmq// docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 -v $PWD/plugins:/plugins rabbitmq:3.10-management// 访问ui: http://127.0.0.1:15672/// cd examples/queue/rabbitmq/producer// go run delay_publish.gofunc main() { c := config.New(*cfgDir, config.WithEnv(*env)) var cfg eagle.Config if err := c.Load("app", &cfg); err != nil { panic(err) } // set global eagle.Conf = &cfg
logger.Init()
rabbitmq.Load() defer rabbitmq.Close()
opts := []options.PublishOption{ options.WithPublishOptionContentType("application/json"), }
var message string for i := 0; i < 100000; i++ { message = "Hello World RabbitMQ!" + time.Now().String() msg := map[string]interface{}{ "message": message, } data, _ := json.Marshal(msg) if err := rabbitmq.PublishWithDelay(context.Background(), "test-demo", data, 10, opts...); err != nil { log.Fatalf("failed publish message: %s", err.Error()) } }}
src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/producer/delay_publish.go
#
附录基于Docker部署RabbitMQ
docker pull bitnami/rabbitmq:latest
docker run -itd \ --hostname localhost \ --name rabbitmq-test \ -p 15672:15672 \ -p 5672:5672 \ -p 1883:1883 \ -p 15675:15675 \ -e RABBITMQ_PLUGINS=rabbitmq_top,rabbitmq_mqtt,rabbitmq_web_mqtt,rabbitmq_prometheus,rabbitmq_stomp,rabbitmq_auth_backend_http \ bitnami/rabbitmq:latest
# 查看插件列表rabbitmq-plugins list# rabbitmq_peer_discovery_consul rabbitmq-plugins --offline enable rabbitmq_peer_discovery_consul# rabbitmq_mqtt 提供与后端服务交互使用,端口1883rabbitmq-plugins enable rabbitmq_mqtt# rabbitmq_web_mqtt 提供与前端交互使用,端口15675rabbitmq-plugins enable rabbitmq_web_mqtt
管理后台: http://localhost:15672
默认账号: guest
默认密码: guest