消息队列
#
概览消息队列也是框架的基本标配,实际开发中也基本上离不开消息队列的使用,比如:及时队列、延迟队列、定时队列。
使用场景如:
- 新用户注册发送欢迎提醒(即时消息)
- 网上购物下订单,30分钟内未支付订单会被关闭(延迟消息)
- 在指定的时间运行任务(定时消息)
在 Eagle 框架中,分为两类消息队列,一种是偏轻量型的消息队列(主要使用redis),一种重量型一点的消息队列(RabbitMQ或Kafak),下面详细介绍。
#
消息队列这里所谓的轻量主要是对使用底层存储的考量,redis 大家基本都在用,使用和部署都比较简单。
caution
主要是对 asynq 做了简单的封装
#
架构图#
特性- 支持即时、延迟和定时消息
- 支持多worker消费
- 支持超时、重试、过期
- 支持worker崩溃自动恢复机制
- 支持redis单机、集群和哨兵模式(Sentinels)
- 支持 web UI 查看
#
配置# config/cron.yaml# redis配置Addr: 127.0.0.1:6379Password: ""DB: 0MinIdleConn: 200DialTimeout: 60sReadTimeout: 500msWriteTimeout: 500msPoolSize: 100PoolTimeout: 240s
Concurrency: 10 # 指定worker的数量
#
定义tasktask里主要做了以下4件事情
- 定义任务类型
- 定义任务payload,即定义任务里需要使用到的数据
- 创建一个task
- 定义一个handle方法,用来编写具体处理任务的逻辑
有两种方式,手动和命令行生成
#
a.手动编写task// internal/tasks/email_welcome.gopackage tasks
import ( "context" "encoding/json" "fmt" "log"
"github.com/hibiken/asynq")
const ( // 任务类型 TypeEmailWelcome = "email:welcome")
// 定义任务payloadtype EmailWelcomePayload struct { UserID int}
// 创建任务func NewEmailWelcomeTask(userID int) (*asynq.Task, error) { payload, err := json.Marshal(EmailWelcomePayload{UserID: userID}) if err != nil { return nil, err } return asynq.NewTask(TypeEmailWelcome, payload), nil}
// 处理任务的具体逻辑func HandleEmailWelcomeTask(ctx context.Context, t *asynq.Task) error { var p EmailWelcomePayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } log.Printf("Sending Email to User: user_id=%d", p.UserID) // Email delivery code ... return nil}
#
b.命令行生成task# 生成任务eagle task add EmailWelcome
# 查看任务列表eagle task list
# 输出任务列表+---+---------------+------------------------+------------------+----------------+| # | TASK NAME | HANDLER NAME | FILE NAME | LOCATION |+---+---------------+------------------------+------------------+----------------+| 1 | email:welcome | HandleEmailWelcomeTask | email_welcome.go | internal/tasks |+---+---------------+------------------------+------------------+----------------+
info
生成结果和手动编写是一致的。
#
注册task在需要执行task的地方进行注册
// 创建任务task, err := NewEmailWelcomeTask(1)if err != nil { log.Fatalf("could not create task: %v", err)}
// 即时消息_, err := GetClient().Enqueue(task)
// 延时消息_, err := GetClient().Enqueue(task, asynq.ProcessIn(10*time.Second))
// 定时消息_, err := GetClient().Enqueue(task, asynq.ProcessAt(time.Now().Add(time.Hour)))
// 超时、重试_, err := GetClient().Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
// 优先级消息_, err := GetClient().Enqueue(task, asynq.Queue(QueueCritical))
#
注册handle在 main.go 进行注册
... // mux maps a type to a handler mux := asynq.NewServeMux() // 这里进行注册 mux.HandleFunc(tasks.TypeEmailWelcome, tasks.HandleEmailWelcomeTask) ...
#
启动servergo run cmd/cron/main.go
OK, 这样task就会按照指定的方式运行了。
#
Example详细查看具体案例