RabbitMQ
RabbitMQ是一个开源的消息代理(message broker)软件,主要用于实现应用程序之间的异步通信。它实现了高级消息队列协议(AMQP),同时也支持MQTT、STOMP等其他的协议。
部署
使用docker和docker-compose进行部署:
核心概念
消息(Message)
- 应用程序之间传递的数据单元。
- 包含两部分:payload(消息体,即实际传输的数据)和properties(元数据,如优先级、延迟等)。
生产者(Producer/Publisher)
- 发送消息的应用程序
- 不直接将消息发送到队列,而是发送到交换机(exchange)
消费者(Consumer)
- 接收消息的应用程序
- 监听特定队列并处理消息。
队列(Queue)
- 消息的存储缓冲区
- 多个消费者可以共享队列中的消息,但默认情况下队列会将消息轮询分发给消费者
- 队列可以持久化,确保服务器重启后仍然存在
交换机(Exchange)
- 消息到达Broker的第一站
- 根据规则将消息路由到一个或多个队列
- 四种类型:
Direct、Fanout、Topic、Headers
Direct Exchange(直连交换机)
作用:精确匹配路由键 工作原理:
- 消息的routing key必须与绑定时指定的binding key完全匹配
- 通常用于单播(unicast)路由 示例:
exchange: order_exchange (direct类型)
queue1绑定key: "food"
queue2绑定key: "electronics"
queue3绑定key: "clothing"Fanout Exchange(扇出/广播交换机)
作用:无条件广播 特点:
- 忽略routing key的存在
- 将消息发送到所有绑定的队列
- 性能最高的交换机类型 示例:
exchange: news_exchange (fanout类型)
绑定队列: weather_queue, sports_queue, stock_queueTopic Exchange(主题交换机)
作用:基于模式匹配的路由 特点:
- routing key和binding key使用
.分隔的单词。 - 支持两种通配符:
*匹配一个单词#匹配零个或多个单词 示例:
exchange: device_exchange (topic类型)
绑定模式:
queue1: "sensor.temperature.*"
queue2: "sensor.*.livingroom"
queue3: "*.humidity.#"Headers Exchange(头交换机)
作用:基于消息头部属性路由 特点:
- 完全忽略routing key
- 路由基于headers字典匹配
- 支持两种匹配条件:
all:必须匹配所有头部键值对any:只需匹配任意一个头部键值对 示例:
exchange: analytics_exchange (headers类型)
绑定配置:
queue1: {"x-match":"all", "format":"json", "priority":"high"}
queue2: {"x-match":"any", "region":"east", "region":"west"}通道(Channel)
Channel是RabbitMQ中的一个核心的抽象概念,它在AMQP协议和实际应用中扮演着关键角色,本质上是建立在TCP连接之上的轻量级逻辑连接,具有以下特性:
- 虚拟连接:单个TCP连接可以承载多个Channel
- 隔离性:每个Channel有独立的通信流和状态
- 轻量级:创建和销毁Channel的开销远小于TCP连接
绑定(Binding)
- 连接交换机和队列的规则
- 可以包含路由键(routing key)作为匹配条件
路由键(Routing Key)
- 附加在消息上的一个属性
- 交换机用它来决定如何路由消息到队列
- 类似邮件中"主题"的概念
虚拟主机(Virtual Host)
- RabbitMQ中独立的环境
- 类似网络中的"命名空间"
- 提供逻辑分组和隔离
常见疑问
多个Consumer,消息怎么分发
RabbitMQ默认执行轮询策略,无脑平均分发消息,不关注每个Consumer的实际执行效率。
消息状态
消息有几种状态:
- Ready:就绪等待分发往consumer。
- Unacked:消息被consumer处理中,并且没有ack或nack。
consumer行为和消息状态:
- consumer的prefetchCount决定了这个consumer可以同时拥有多少个Unacked的消息。
- 当consumer的Unacked消息满了之后,不会再接收Ready的消息。
- consumer断开之前或消息ack/nack之前,Unacked的消息的状态不会被取消,会一直维持。
- 当consumer中断后,短暂时间后Unacked的消息会变成Ready。
- prefetchCount设置为0,表示无线接收。