角色

  • Connection(连接):与 RabbitMQ 服务器的物理 TCP 长连接
  • Channel(通道):基于连接的轻量级逻辑连接(官方推荐:一个连接开多个通道,避免多连接开销)
  • Exchange(交换机):消息路由中转站,负责将消息转发到对应队列
  • Direct 交换机:精确匹配路由键,消息只投递给绑定了相同路由键的队列
  • 队列:快递柜
  • 生产者:只管发消息,不管消息存在哪、谁来取
  • 消费者:自己创建队列,自己绑定交换机

工作模式

模式名称交换机类型核心路由规则
1. 简单模式无(默认)生产者→队列→消费者(一对一)
2. 工作队列模式无(默认)一个队列,多个消费者争抢任务
3. 发布订阅模式Fanout广播消息(无视路由键)
4. 路由模式Direct精确匹配路由键
5. 主题模式Topic通配符模糊匹配路由键
6. RPC 模式Direct双向同步通信(请求 + 响应)

消息可靠性

持久化

队列持久化 消息持久化 交换机持久化

消息确认机制

RabbitMQ 的 Publish 是一个纯异步的发射后不管(Fire and Forget)操作,那要如何确认到底发送成功没有呢?就是Publisher Confirms登场的时候了。

  • 回执单写着“妥投”(Ack):代表服务器安全收到。
  • 回执单写着“拒收/丢失”(Nack):代表出错了。 用法:
1// 1. 打造一个实体信箱,规定里面只能装“回执单” (amqp.Confirmation)
2mailbox := make(chan amqp.Confirmation, 1)
3// 2. 把信箱的位置告诉快递员
4p.channel.NotifyPublish(mailbox) 
1type Confirmation struct {
2    DeliveryTag uint64 // 消息的编号(第1封信、第2封信...)
3    Ack         bool   // 成功还是失败?(true = 成功, false = 失败)
4}

通常在并发场景下的使用流程:

同步确认

  • 加锁 -> 发送 -> 死等回执 -> 解锁 同步阻塞确认

异步确认

  • 或者使用DeliveryTag(消息编号)实现异步确认
1type Confirmation struct {
2    DeliveryTag uint64 // 这是第几号消息的回执
3    Ack         bool   // 成功还是失败
4    Multiple    bool   // 【极其重要】是否为批量确认?
5}

Multiple 字段:为了节省网络带宽,RabbitMQ 有时候不会一条一条给你发回执,而是发一条回执说:eliveryTag = 100, Multiple = true,意思是从 1 到 100 号消息,全部处理成功了。必须在代码里处理这种批量确认。

原理是使用并发安全的字典 sync.Map:

1// 1. 发送前,获取当前的单号 (DeliveryTag) 
2seqNo := p.channel.GetNextPublishSeqNo() 
3// 2. 记在小本本上:Tag -> 订单号 
4p.unconfirmed.Store(seqNo, bizNo)

消费者启动一个异步监听协程,用sync.map的loadanddelte,未确认ack则重发

这样处理,在发送消息就不用加锁等待select了。

防止数据库抖动、网络超时 生产者确认和消费者确认(Publisher Confirms)

消息重复咋办

如果要防止抢购系统重复下单,需要保持 幂等性:

  1. 消息唯一ID
  2. 数据库唯一索引
  3. redis分布式锁(成本高)

死信队列

如果消息被消费者拒绝、消息过期(TTL超时)、队列已满,会产生死信 可以配置死信交换机,路由到死信队列,由死信消费者监控死信队列