@@ -1,58 +0,0 @@ RabbitMQ进阶 - 消费端要点介绍 | 凤凰涅槃进阶之路

RabbitMQ进阶 - 消费端要点介绍

Abel sun2022年12月24日
约 1674 字大约 6 分钟

RabbitMQ进阶 - 消费端要点介绍

1. 简介

消费者客户端可以通过 推模式拉模式 来获取并消费消息,RabbitMQ 把消息推送后(或客户端主动 ACK)后,RabbitMQ 把当前消息从队列中标记清除。如果由于某些原因无法处理当前接受到的信息,可以通过 channel.basicNack 或则 channel.basicReject 来拒绝掉。

对于消费者来说,还有几点需要注意:

  • 消息分发
  • 消息顺序性
  • 弃用 QueueingConsumer

2. 消息分发

当 RabbitMQ 队列有多个消费者 时,队列收到的消息将以 轮询(round-robin) 方式分发给消费者,每条消息只会发送给订阅列表里的 一个消费者。这种方式是专门为并发程序设计的,如果程序处理不过来,只要增加更多的消费者来处理消息即可。

很多时候轮询的分发机制也有问题。默认情况下,如果有 n 个消费者,RabbitMQ 会将第 m 条消息分发给第 m%n (取余) 个消费者。RabbitMQ 不管消费者是否消费并已经确认(Basic.Ack)消息。就可能会导致:某些消费者来不及处理消息,有些处理得很快的情况。

这种情况,需要 限制信道上 的消费者所能 保持的最大未确认消息的数量,通过 channel.basicQos(int prefetchCount) 方法。

举例说明:在订阅队列之前,消费者设置 channel.basicQos(5),再订阅队列。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果到达了设置上限,就不会向这个消费者再发送任何消息。直到消费者确认了某条消费者之后,RabbitMQ 把对应的计数器 -1,继续分发消息。

注意要点:Basic.Qos 对拉模式无效

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  • prefetchSize:消费者所能接受未确认消息的总体大小的上限(单位为 B),设置为 0 时,表示无上限

  • prefetchCount:消费者所能接受最大未确认消息的数量

  • global:

    一个信道可以消费多个队列

    当该值大于 0 时,这个信道需要和各个队列协调,确保发送的消息都没有超过所限定的 prefetchCount。这会让 RabbitMQ 的性能降低,尤其当这些队列分散在集群中的多个 Broker 节点之中。为了解决这个性能问题,定义了 global 参数

    global 参数AMQP 0-9-1RabbitMQ
    false信道上所有的消费者都要遵从 prefetchCount 的限制信道上所有的消费者都要遵从 prefetchCount 的限制
    true当前通信链路(Connection)上所有的消费者需要遵循从 prefetchCount 的限制信道上所有的消费者都要遵从 prefetchCount 的限制(这里不知道书上是不是写错了?)

    channel.basicQos 只针对单个消费者的。对于同一个信道上的多个消费者而言,如果设置了 prefetchCount ,则都会生效。

    如下代码,各自的能接收到的未确认消息上限都是 10

    channel.basicQos(10);
    channel.basicConsume("queue1",false,consumerl1)
    channel.basicConsume("queue2",false,consumerl2)
    

    如果同时设置了 global 为 false 和 true 呢?他们两个的限制都有效果:如下面这段代码

    channel.basicQos(3, false);
    channel.basicQos(5, true);
    channel.basicConsume("queue1", false, consumerl1);
    channel.basicConsume("queue2", false, consumerl2);
    

    那么生效情况如下:

    • 每个消费者最多可收到 3 个未确认的消息
    • 两个消费者最多可收到 5 个未确认的消息

    这种设置方式,会增加 RabbitMQ 的负载,会使用更多的资源来协调完成这些限制。建议用默认值的 false。

3. 消息顺序性

指:消费者 消费到的消息 和发送者 发布的消息 顺序是一致的。

如:发布 1,2,3 那么消费的顺序也是 1,2,3

单个生产者和单个消费者的情况下,消息的有序性是能保证的,也是可验证的。在多消费者和多生产者的情况下,无法确定消息到达 Broker 的前后顺序,也无法确定客户端消费的顺序,这个其实是正常现象。分布式中本来就存在这样的现象。

有如下几种情况,消息的顺序性会被打破:但都是正常现象:

  • 使用事物机制时,发送失败,使用另一个线程补发此消息。此时消息就不能保证按照 1,2,3,4 的顺序到达 Broker 了
  • 使用不同的消息过期时间,先过期的先被消费
  • 使用优先级消息,优先级高的先被消费
  • 客户端使用 Basic.Nack/.Reject 将消息拒绝时,同时 requeue= true, 消息重入队列后,也无法保证消息顺序还和发送的时候是一致的

从以上点可以看到,在很多场景下,并不能保证消息的顺序性。

如果想要实现消息的有序性,则可以通过在消息体内增加全部有序标识,程序端自己实现逻辑判定

4. 启用 QueuingConsumer

   ...
    queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueue, true, queueingConsumer);
}

public String call(String message) throws IOException, InterruptedException {
    final String corrid = UUID.randomUUID().toString();
    final AMQP.BasicProperties properties = new AMQP.BasicProperties()
            .builder()
            .correlationId(corrid)
            .replyTo(replyQueue)
            .build();
    channel.basicPublish("", requestQueue, properties, "message".getBytes());

    // 想服务端发送后,轮询,知道回去到服务端的响应为止
    while (true) {
        final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrid)) {
            return new String(delivery.getBody());
        }
    }
}
    

前面讲解 RPC 实现open in new window 中用到过这个类,如上的代码片段。在 RabbitMQ 4.x 中被标记为 @Deprecated 了。

是因为该类有几个大缺陷:比如内存溢出问题,由于某些原因,队列中堆积了比较多的消息,可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消费。

导致内存溢出的原因是:QueuingConsumer 内部使用 LinkedBlockingQueue 来缓存消息,当设置的 Basic.Qos 数量太大的时候,消息体也很大(如一个消息 200M),那么就会导致内存溢出。可通过限制 qos 的数量来解决这个问题,但是一定 要在订阅之前设置

QueuingConsumer 还包括以下缺陷(包括但不限于):

  • 会拖累同一个 Connection 下的所有通道,使其性能降低
  • 同步递归调用 QueuingConsumer 会产生死锁
  • RabbitMQ 的自动连接恢复机制(automatic Connection recovery) 不支持 QueuingConsumer 的这种形式
  • QueuingConsumer 不是事件驱动的

所以还是使用 DefaultConsumer 之类的来订阅队列。

参考文章

消费端要点介绍open in new window

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.9.1