角色
- Connection(连接):与 RabbitMQ 服务器的物理 TCP 长连接
- Channel(通道):基于连接的轻量级逻辑连接(官方推荐:一个连接开多个通道,避免多连接开销)
- Exchange(交换机):消息路由中转站,负责将消息转发到对应队列
- Direct 交换机:精确匹配路由键,消息只投递给绑定了相同路由键的队列
- 队列:快递柜
- 生产者:只管发消息,不管消息存在哪、谁来取
- 消费者:自己创建队列,自己绑定交换机
工作模式
| 模式名称 | 交换机类型 | 核心路由规则 |
|---|---|---|
| 1. 简单模式 | 无(默认) | 生产者→队列→消费者(一对一) |
| 2. 工作队列模式 | 无(默认) | 一个队列,多个消费者争抢任务 |
| 3. 发布订阅模式 | Fanout | 广播消息(无视路由键) |
| 4. 路由模式 | Direct | 精确匹配路由键 |
| 5. 主题模式 | Topic | 通配符模糊匹配路由键 |
| 6. RPC 模式 | Direct | 双向同步通信(请求 + 响应) |
消息可靠性
持久化
队列持久化 消息持久化 交换机持久化
消息确认机制
RabbitMQ 的 Publish 是一个纯异步的发射后不管(Fire and Forget)操作,那要如何确认到底发送成功没有呢?就是Publisher Confirms登场的时候了。
- 回执单写着“妥投”(
Ack):代表服务器安全收到。 - 回执单写着“拒收/丢失”(
Nack):代表出错了。 用法:
1// 1. 打造一个实体信箱,规定里面只能装“回执单” (amqp.Confirmation)
2mailbox := make(chan amqp.Confirmation, 1)
3// 2. 把信箱的位置告诉快递员
4p.channel.NotifyPublish(mailbox)
1type Confirmation struct {
2 DeliveryTag uint64 // 消息的编号(第1封信、第2封信...)
3 Ack bool // 成功还是失败?(true = 成功, false = 失败)
4}
通常在并发场景下的使用流程:
同步确认
- 加锁 -> 发送 -> 死等回执 -> 解锁 同步阻塞确认
异步确认
- 或者使用
DeliveryTag(消息编号)实现异步确认
1type Confirmation struct {
2 DeliveryTag uint64 // 这是第几号消息的回执
3 Ack bool // 成功还是失败
4 Multiple bool // 【极其重要】是否为批量确认?
5}
Multiple 字段:为了节省网络带宽,RabbitMQ 有时候不会一条一条给你发回执,而是发一条回执说:eliveryTag = 100, Multiple = true,意思是从 1 到 100 号消息,全部处理成功了。必须在代码里处理这种批量确认。
原理是使用并发安全的字典 sync.Map:
1// 1. 发送前,获取当前的单号 (DeliveryTag)
2seqNo := p.channel.GetNextPublishSeqNo()
3// 2. 记在小本本上:Tag -> 订单号
4p.unconfirmed.Store(seqNo, bizNo)
消费者启动一个异步监听协程,用sync.map的loadanddelte,未确认ack则重发
这样处理,在发送消息就不用加锁等待select了。
防止数据库抖动、网络超时 生产者确认和消费者确认(Publisher Confirms)
消息重复咋办
如果要防止抢购系统重复下单,需要保持 幂等性:
- 消息唯一ID
- 数据库唯一索引
- redis分布式锁(成本高)
死信队列
如果消息被消费者拒绝、消息过期(TTL超时)、队列已满,会产生死信 可以配置死信交换机,路由到死信队列,由死信消费者监控死信队列