Skip to main content

RabbitMQ 消息队列

概览#

消息队列也是框架的基本标配,实际开发中也基本上离不开消息队列的使用,比如:及时队列、延迟队列、定时队列。
使用场景如:

  • 新用户注册发送欢迎提醒(即时消息)
  • 网上购物下订单,30分钟内未支付订单会被关闭(延迟消息)
  • 在指定的时间运行任务(定时消息)

在 Eagle 框架中,分为两类消息队列,一种是偏轻量型的消息队列(主要使用redis),一种重量型一点的消息队列(RabbitMQ或Kafak),下面详细介绍, 本文主要介绍 RabbitMQ 消息队列。

RabbitMQ 消息队列#

特性#

  • 支持多队列配置
  • 支持延迟队列配置

配置#

# rabbitmq.yaml, 支持配置多种队列test-demo:  URI: "amqp://guest:guest@localhost:5672/"  AutoDeclare: true  Timeout: 5s  Exchange:    Name: local-test-exchange    Kind: direct    Durable: true    AutoDelete: false    Internal: false    NoWait: false    Args: {}  Queue:    name: local-test-queue    durable: true    AutoDelete: false    Exclusive: false    NoWait: false    Args: {}  Bind:    RoutingKey: local-test-routing-key    NoWait: false    Args: {}
test-multi:  URI: "amqp://guest:guest@localhost:5672/"  AutoDeclare: true  Timeout: 5s  Exchange:    Name: local-test2-exchange    Kind: direct    Durable: true    AutoDelete: false    Internal: false    NoWait: false    Args: {}  Queue:    name: local-test2-queue    durable: true    AutoDelete: false    Exclusive: false    NoWait: false    Args: {}  Bind:    RoutingKey: local-test2-routing-key    NoWait: false    Args: {}

使用#

生产者#

package main
import (    "context"    "encoding/json"    "log"    "time"
    "github.com/go-eagle/eagle/pkg/queue/rabbitmq/options"
    eagle "github.com/go-eagle/eagle/pkg/app"    "github.com/go-eagle/eagle/pkg/config"    logger "github.com/go-eagle/eagle/pkg/log"    "github.com/spf13/pflag"
    "github.com/go-eagle/eagle/pkg/queue/rabbitmq")
var (    cfgDir = pflag.StringP("config dir", "c", "config", "config path.")    env    = pflag.StringP("env name", "e", "", "env var name."))
// 启动 rabbitmq// docker run -it  --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management// 访问ui: http://127.0.0.1:15672/// cd examples/queue/rabbitmq/producer// go run main.gofunc main() {    c := config.New(*cfgDir, config.WithEnv(*env))    var cfg eagle.Config    if err := c.Load("app", &cfg); err != nil {        panic(err)    }    // set global    eagle.Conf = &cfg
    logger.Init()
    rabbitmq.Load()    defer rabbitmq.Close()
    opts := []options.PublishOption{        options.WithPublishOptionContentType("application/json"),    }
    go func() {        var message string        for i := 0; i < 100000; i++ {            message = "Hello World RabbitMQ!" + time.Now().String()            msg := map[string]interface{}{                "message": message,            }            data, _ := json.Marshal(msg)            if err := rabbitmq.Publish(context.Background(), "test-demo", data, opts...); err != nil {                log.Fatalf("failed publish message: %s", err.Error())            }        }    }()
    var message string    for i := 0; i < 100000; i++ {        message = "Hello World multi RabbitMQ!" + time.Now().String()        msg := map[string]interface{}{            "message": message,        }        data, _ := json.Marshal(msg)        if err := rabbitmq.Publish(context.Background(), "test-multi", data, opts...); err != nil {            log.Fatalf("failed publish message: %s", err.Error())        }    }
}

src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/producer/main.go

消费者#

package main
import (    "context"    "encoding/json"    "os"    "os/signal"    "syscall"
    "github.com/go-eagle/eagle/pkg/queue/rabbitmq/options"
    "github.com/rabbitmq/amqp091-go"
    eagle "github.com/go-eagle/eagle/pkg/app"    "github.com/go-eagle/eagle/pkg/config"
    "github.com/spf13/pflag"
    logger "github.com/go-eagle/eagle/pkg/log"
    "github.com/go-eagle/eagle/pkg/queue/rabbitmq")
var (    cfgDir = pflag.StringP("config dir", "c", "config", "config path.")    env    = pflag.StringP("env name", "e", "", "env var name."))
// cd examples/queue/rabbitmq/consumer/// go run main.gofunc main() {    pflag.Parse()
    // init config    c := config.New(*cfgDir, config.WithEnv(*env))    var cfg eagle.Config    if err := c.Load("app", &cfg); err != nil {        panic(err)    }    // set global    eagle.Conf = &cfg
    logger.Init()
    rabbitmq.Load()    defer rabbitmq.Close()
    stopSig := make(chan os.Signal, 1)    signal.Notify(stopSig, syscall.SIGINT, syscall.SIGTERM)
    done := make(chan struct{})    stop := make(chan struct{}, 1)
    // 自定义消息处理函数    handler := func(ctx context.Context, body amqp091.Delivery) (action rabbitmq.Action) {        msg := make(map[string]interface{})        err := json.Unmarshal(body.Body, &msg)        if err != nil {            logger.Errorf("consumer handler unmarshal msg err: %s", err.Error())            return rabbitmq.NackDiscard        }        logger.Infof("consumer handler receive msg: %s", msg)        return rabbitmq.Ack    }
    // rabbitmq consume message    ctx := context.Background()
    opts := []options.ConsumerOption{        options.WithConsumerOptionConcurrency(1),    }
    go func() {        err := rabbitmq.Consume(ctx, "test-demo", handler, opts...)        if err != nil {            logger.Errorf("rabbitmq consume err: %s", err.Error())        }    }()
    for {        select {        case <-stopSig:            logger.Info("received stop signal")            stop <- struct{}{}        case <-stop:            logger.Info("stopping service")            close(done)            return        case <-done:            logger.Info("stopped service gracefully")            return        }    }}

src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/consumer/main.go

延迟队列#

package main
import (    "context"    "encoding/json"    "log"    "time"
    eagle "github.com/go-eagle/eagle/pkg/app"    "github.com/go-eagle/eagle/pkg/config"    logger "github.com/go-eagle/eagle/pkg/log"    "github.com/go-eagle/eagle/pkg/queue/rabbitmq"    "github.com/go-eagle/eagle/pkg/queue/rabbitmq/options")
// 启动 rabbitmq// docker run -it  --name rabbitmq -p 5672:5672 -p 15672:15672 -v $PWD/plugins:/plugins rabbitmq:3.10-management// 访问ui: http://127.0.0.1:15672/// cd examples/queue/rabbitmq/producer// go run delay_publish.gofunc main() {    c := config.New(*cfgDir, config.WithEnv(*env))    var cfg eagle.Config    if err := c.Load("app", &cfg); err != nil {        panic(err)    }    // set global    eagle.Conf = &cfg
    logger.Init()
    rabbitmq.Load()    defer rabbitmq.Close()
    opts := []options.PublishOption{        options.WithPublishOptionContentType("application/json"),    }
    var message string    for i := 0; i < 100000; i++ {        message = "Hello World RabbitMQ!" + time.Now().String()        msg := map[string]interface{}{            "message": message,        }        data, _ := json.Marshal(msg)        if err := rabbitmq.PublishWithDelay(context.Background(), "test-demo", data, 10, opts...); err != nil {            log.Fatalf("failed publish message: %s", err.Error())        }    }}

src: https://github.com/go-eagle/eagle/blob/master/examples/queue/rabbitmq/producer/delay_publish.go

附录#

基于Docker部署RabbitMQ

docker pull bitnami/rabbitmq:latest
docker run -itd \    --hostname localhost \    --name rabbitmq-test \    -p 15672:15672 \    -p 5672:5672 \    -p 1883:1883 \    -p 15675:15675 \    -e RABBITMQ_PLUGINS=rabbitmq_top,rabbitmq_mqtt,rabbitmq_web_mqtt,rabbitmq_prometheus,rabbitmq_stomp,rabbitmq_auth_backend_http \    bitnami/rabbitmq:latest
# 查看插件列表rabbitmq-plugins list# rabbitmq_peer_discovery_consul rabbitmq-plugins --offline enable rabbitmq_peer_discovery_consul# rabbitmq_mqtt 提供与后端服务交互使用,端口1883rabbitmq-plugins enable rabbitmq_mqtt# rabbitmq_web_mqtt 提供与前端交互使用,端口15675rabbitmq-plugins enable rabbitmq_web_mqtt

管理后台: http://localhost:15672
默认账号: guest
默认密码: guest

Reference#