Skip to content
0

RabittMQ

RabbitMQ 是一个常用的消息中间件,维护着消息队列,而消息队列就是消息传输过程中,保存消息的容器,因先进先出的特性,所以被称为队列。

AMQP 和 MQTT

两者都是应用层的协议

  • AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议)
  • MQTT,即 Message Queuing Telemetry Transport(消息队列遥测传输协议)

两者都是用于异步消息通信的协议,amqp 常用于服务端,mqtt 因其对设备性能要求低,适用于嵌入式和IoT设备场景

特点

RabittMQ 基于 AMQP 协议,最大的特点就是消费不需要保证提供方存在,实现了服务间的高度解耦

undefined
undefined

为什么要使用 RabbitMQ

  • 削峰,异步,负载均衡的功能 -> 请求削峰
  • 将同步请求转为串行请求,限流作用 -> 顺序消费
  • 生产者,服务者解耦 -> 异步通信
  • 异步下单,排队逻辑 -> 延迟消息

安装和启动

brew install rabbitmq
brew services start rabbitmq

然后访问 http://localhost:15672,初始账号和密码均为 guest

使用 NodeJS 连接

我们使用 amqplib 帮助我们在 NodeJs 中连接 rabbitmq

安装:

$ npm i amqplib

import amqplib from 'amqplib'

(async() => {
  const queue = 'tasks'
  const conn = await amqplib.connect('amqp://localhost')
  // => amqp 协议的默认端口是 5672

  const ch1 = await conn.createChannel()
  await ch1.assertQueue(queue)

  // Listener
  ch1.consume(queue, (msg) => {
    if(msg !== null) {
      console.log('Received:', msg.content.toString())
      // 处理完后要及时发送ack消息给队列,否则容易造成内存溢出
      ch1.ack(msg)
    } else {
      console.log('Consumer cancelled by server')
    }
  })

  // Sender
  const ch2 = await conn.createChannel()

  setInterval(() => {
    ch2.sendToQueue(queue, Buffer.from('Something to do.'))
  }, 1000)
})()

TIP

其实从包名可以看到,amqplib 不仅适用于 rabbitmq,而是针对 amqp 协议的

启动后,将在控制台看到 ch1 消费了 ch2 推入消息队列的信息:

$ tsx index.ts
Received: Something to do.
Received: Something to do.
Received: Something to do.
...

生产者和消费者是完全解耦的,这意味着生产和消费可以在不同的时刻,不同的进程,不同的编程语言,甚至是不同的机器上实现。

与 Kafka 区别

Kafka 是 apache 开源的消息队列顶级项目之一,在大数据场景下使用较多。Kafka 和 RabbitMQ 各有应用场景,主要区别如下:

在实际生产应用中,Kafka 主要用于消息传输的数据管道,RabbitMQ 用于交易数据的传输管道,主要的取舍依据在于是否丢数据的可能

RabbitMQ 具有较高的严谨性,在金融场景中经常使用,数据丢失的可能性更小,具有更高的实时性。Kafka 的优势主要体现在数据的吞吐量,虽然可以通过策略使数据不丢失,但是从严谨性的角度来讲,大不如 RabbitMQ

引入的问题

引入 RabbitMQ 也会有一系列问题,需要通过一些措施取解决

  • 可用性降低:MQ 挂了,系统崩溃
  • 系统复杂度提高:消息顺序、消息丢失
  • 一致性问题:数据不一致

Released under the MIT License.