一天连发两次事故,都是RabbitMQ惹的祸……

不一样的科技宅 2021-01-14 10:47:06

背景

 

由于大量商户反应收不到推送,我第一反应是不是推送系统挂了,导致没有进行推送。于是让运维老哥检查推送系统各节点的情况,发现都正常。于是打开RabbitMQ的管控台看了一下,人都蒙了。那天我和同事一起吃完晚饭回公司加班,然后就群里就有人@我说xxx商户说收不到推送,一开始觉得没啥。我第一反应是不是极光没注册上,就让客服通知商户,重新登录下试试。

 

这边打开极光推送的后台进行检查。后面反应收不到推送的越来越多,我就知道这事情不简单。

 

一、事故经过

 

由于大量商户反应收不到推送,我第一反应是不是推送系统挂了,导致没有进行推送。于是让运维老哥检查推送系统各节点的情况,发现都正常。于是打开RabbitMQ的管控台看了一下,人都蒙了。

 

已经有几万条消息处于ready状态,还有几百条unacked的消息。

 

我以为推送服务和MQ连接断开了,导致无法推送消息,于是让运维重启推送服务,将所有的推送服务重启完,发现unacked的消息全部变成ready,但是没过多久又有几百条unacked的消息了,这个就很明显了能消费,没有进行ack呀。

 

当时我以为是网络问题,导致mq无法接收到ack,让运维老哥检查了一下,发现网络没问题。现在看是真的是傻,网络有问题连接都连不上。由于确定的是无法ack造成的,立马将ack模式由原来的manual 改成auto紧急发布。将所有的节点升级好以后,发现推送正常了。

 

你以为这就结束了其实并没有,没过多久发现有一台MQ服务出现异常,由于生产采用了镜像队列,立即将这台有问题的MQ从集群中移除。直接进行重置,然后加入回集群。这事情算是告一段落了。此时已经接近24:00了。

 

 

时间来到第二天上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,并且占用率很高。我的乖乖从昨晚到现在写了快40G的日志,一看报错信息瞬间就明白问题出在哪里了。麻溜的把bug修了紧急发布。

 

吐槽一波公司的ELK,压根就没有收集到这个报错信息,导致我没有及时发现。

 

 

二、事故重现-队列阻塞

 

 
1、MQ配置

 

 

spring:

 # 消息队列

  rabbitmq:

    host: 10.0.0.53

    username: guest

    password: guest

    virtual-host: local

    port: 5672

    # 消息发送确认

    publisher-confirm-type: correlated

    # 开启发送失败退回

    publisher-returns: true

    listener:

      simple:

        # 消费端最小并发数

        concurrency: 1

        # 消费端最大并发数

        max-concurrency: 5

        # 一次请求中预处理的消息数量

        prefetch: 2

        # 手动应答

        acknowledge-mode: manual

 

 
2、问题代码

 

 

@RabbitListener(queues = ORDER_QUEUE)

public void receiveOrder(@Payload String encryptOrderDto,

                                      @Headers Map headers,

                                      Channel channel) throws Exception {

    // 解密和解析

    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);

    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

 

    try {

        // 模拟推送

        pushMsg(orderDto);

    }catch (Exception e){

        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));

    }finally {

        // 消息签收

        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);

    }

 

}

 

看起来好像没啥问题。由于和交易系统约定好,订单数据需要先转换json串,然后再使用AES进行加密,所以这边需要,先进行解密然后在进行解析,才能得到订单数据。

 

为了防止消息丢失,交易系统做了失败重发机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行ack。

 

默默的吐槽一句:人在家中坐,锅从天上来。

 

 
3、模拟推送

 

推送代码

 

发送3条正常的消息

 

 

curl http://localhost:8080/sendMsg/3

 

发送1条错误的消息

 

curl http://localhost:8080/sendErrorMsg/1

 

再发送3条正常的消息

 

curl http://localhost:8080/sendMsg/3

 

 

观察日志发下,虽然有报错,但是还能正常进行推送。但是RabbitMQ已经出现了一条unacked的消息。

 

 

继续发送1条错误的消息

 

curl http://localhost:8080/sendErrorMsg/1

 

再发送3条正常的消息

 

curl http://localhost:8080/sendMsg/3

 

这个时候你会发现控制台报错,当然错误信息是解密失败,但是正常的消息却没有被消费,这个时候其实队列已经阻塞了。

 

 

 

从RabbitMQ管控台也可以看到,刚刚发送的的3条消息处于ready状态。这个时候就如果一直有消息进入,都会堆积在队里里面无法被消费。

 

再发送3条正常的消息

 

curl http://localhost:8080/sendMsg/3

 

 

 
4、分析原因

 

上面说了是由于没有进行ack导致队里阻塞。那么问题来了,这是为什么呢?其实这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

 

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。

 

举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

 

listener:

  simple:

    # 消费端最小并发数

    concurrency: 1

    # 消费端最大并发数

    max-concurrency: 5

    # 一次处理的消息数量

    prefetch: 2

    # 手动应答

    acknowledge-mode: manual

 

prefetch参数就是PrefetchCount。

 

通过上面的配置发现prefetch我只配置了2,并且concurrency配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这2条消息一直没有被ack。将缓冲区沾满了,这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。

 

 
5、判断队列是否有阻塞的风险

 

当ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。

 

channlCount就是由concurrency,max-concurrency决定的。

 

  • min =     concurrency * prefetch * 节点数量;

  • max =     max-concurrency * prefetch * 节点数量。

 

由此可以的出结论

 

  • unacked_msg_count <     min 队列不会阻塞。但需要及时处理     unacked的消息;

  • unacked_msg_count >=     min 可能会出现堵塞;

  • unacked_msg_count >=     max 队列一定阻塞。

 

这里需要好好理解一下。

 

 
6、处理方法

 

其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。

 

对于这个就需要有日志监控系统,来及时告警了。

 

@RabbitListener(queues = ORDER_QUEUE)

public void receiveOrder(@Payload String encryptOrderDto,

                                      @Headers Map headers,

                                      Channel channel) throws Exception {

    try {

 

        // 解密和解析

        String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);

        OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

 

        // 模拟推送

        pushMsg(orderDto);

    }catch (Exception e){

        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);

    }finally {

        // 消息签收

        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);

    }

 

}

 

 
7、注意的点

 

unacked的消息在consumer切断连接后(重启),会自动回到队头。

 

三、事故重现-磁盘占用飙升

 

一开始我不知道代码有问题,就是以为单纯的没有进行ack所以将ack模式改成auto自动,紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。

 

其实现在回想起来是非常危险的操作的,将ack模式改成auto自动,这样会使QOS不生效。会出现大量消息涌入consumer从而造成consumer宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出现问题。

 

 
1、问题代码

 

@RabbitListener(queues = ORDER_QUEUE)

public void receiveOrder(@Payload String encryptOrderDto,

                                      @Headers Map headers,

                                      Channel channel) throws Exception {

    // 解密和解析

    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);

    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

 

    try {

 

        // 模拟推送

        pushMsg(orderDto);

    }catch (Exception e){

        log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);

    }finally {

        // 消息签收

        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);

    }

 

}

 

配置文件

 

listener:

  simple:

    # 消费端最小并发数

    concurrency: 1

    # 消费端最大并发数

    max-concurrency: 5

    # 一次处理的消息数量

    prefetch: 2

    # 手动应答

    acknowledge-mode: auto

 

由于当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,所以还是会发出少量有误的消息。

 

发送1条错误的消息

 

curl http://localhost:8080/sendErrorMsg/1

 

 

 

 
2、原因

 

RabbitMQ消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。

 

四、解决方法

 

将default-requeue-rejected: false即可。

 

五、总结

 

  • 个人建议,生产环境不建议使用自动ack,这样会QOS无法生效。

  • 在使用手动ack的时候,需要非常注意消息签收。

  • 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。

 

try {

    // 业务逻辑。

}catch (Exception e){

    // 输出错误日志。

}finally {

    // 消息签收。

}

 

六、代码地址

 

https://gitee.com/huangxunhui/rabbitmq_accdient.git

 

七、结尾

 

如果有人告诉你遇到线上事故不要慌,除非是超级大佬久经沙场。否则就是瞎扯淡,你让他来试试,看看他会不会大脑一片空白,直冒汗。

 

>>>>

参考资料

 

  • RabbitMQ消息监听异常问题探究

 

作者丨不一样的科技宅
来源丨公众号:程序员的成长之路(ID:cxydczzl)
dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn
最新评论
访客 2023年08月20日

230721

访客 2023年08月16日

1、导入Mongo Monitor监控工具表结构(mongo_monitor…

访客 2023年08月04日

上面提到: 在问题描述的架构图中我们可以看到,Click…

访客 2023年07月19日

PMM不香吗?

访客 2023年06月20日

如今看都很棒

活动预告