RabittMQ
RabbitMQ 是一个常用的消息中间件,维护着消息队列,而消息队列就是消息传输过程中,保存消息的容器,因先进先出的特性,所以被称为队列。
AMQP 和 MQTT
两者都是应用层的协议
- AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议)
- MQTT,即 Message Queuing Telemetry Transport(消息队列遥测传输协议)
两者都是用于异步消息通信的协议,amqp 常用于服务端,mqtt 因其对设备性能要求低,适用于嵌入式和IoT设备场景
特点
RabittMQ 基于 AMQP 协议,最大的特点就是消费不需要保证提供方存在,实现了服务间的高度解耦
undefined
undefined
为什么要使用 RabbitMQ
- 削峰,异步,负载均衡的功能 -> 请求削峰
- 将同步请求转为串行请求,限流作用 -> 顺序消费
- 生产者,服务者解耦 -> 异步通信
- 异步下单,排队逻辑 -> 延迟消息
安装和启动
brew install rabbitmq
brew services start rabbitmq然后访问 http://localhost:15672,初始账号和密码均为 guest
使用 NodeJS 连接
我们使用 amqplib 帮助我们在 NodeJs 中连接 rabbitmq。
安装:
$ npm i amqplib
import amqplib from 'amqplib'
(async() => {
const queue = 'tasks'
const conn = await amqplib.connect('amqp://localhost')
// => amqp 协议的默认端口是 5672
const ch1 = await conn.createChannel()
await ch1.assertQueue(queue)
// Listener
ch1.consume(queue, (msg) => {
if(msg !== null) {
console.log('Received:', msg.content.toString())
// 处理完后要及时发送ack消息给队列,否则容易造成内存溢出
ch1.ack(msg)
} else {
console.log('Consumer cancelled by server')
}
})
// Sender
const ch2 = await conn.createChannel()
setInterval(() => {
ch2.sendToQueue(queue, Buffer.from('Something to do.'))
}, 1000)
})()TIP
其实从包名可以看到,amqplib 不仅适用于 rabbitmq,而是针对 amqp 协议的
启动后,将在控制台看到 ch1 消费了 ch2 推入消息队列的信息:
$ tsx index.ts
Received: Something to do.
Received: Something to do.
Received: Something to do.
...生产者和消费者是完全解耦的,这意味着生产和消费可以在不同的时刻,不同的进程,不同的编程语言,甚至是不同的机器上实现。
与 Kafka 区别
Kafka 是 apache 开源的消息队列顶级项目之一,在大数据场景下使用较多。Kafka 和 RabbitMQ 各有应用场景,主要区别如下:
在实际生产应用中,Kafka 主要用于消息传输的数据管道,RabbitMQ 用于交易数据的传输管道,主要的取舍依据在于是否丢数据的可能
RabbitMQ 具有较高的严谨性,在金融场景中经常使用,数据丢失的可能性更小,具有更高的实时性。Kafka 的优势主要体现在数据的吞吐量,虽然可以通过策略使数据不丢失,但是从严谨性的角度来讲,大不如 RabbitMQ
引入的问题
引入 RabbitMQ 也会有一系列问题,需要通过一些措施取解决
- 可用性降低:MQ 挂了,系统崩溃
- 系统复杂度提高:消息顺序、消息丢失
- 一致性问题:数据不一致