RabbitMQ延迟队列插件

编程 / 2024-08-10

利用 RabbitMQ 官方提供的延迟队列插件实现延时任务场景比如订单超时自动取消、优惠券过期提醒、退款处理。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭 “,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
使用rabbitmq作为延迟队列有三种方式
第一种:设置一个上层队列,不对接消费者,上层队列给该队列所有消息设置一个存活时间,当存活时间到期后则落入死信队列,消费者只监听死信队列,完成延迟消息的效果。
缺点:10s的延迟建立一次队列,20s也要有一个,不方便拓展。
第二种:是给每个消息设置存活时间,发送到同一个队列里面,但这种方式并不是延迟队列,因为rabbitmq只会检查第一个消息是否过期,如果过期就会丢到死信队列,
但如果第一个消息的存货时间很长,第二个消息的很短,则第二个消息并不会被消息队列发现他已经到期了。
第三种,采用延迟队列插件的方式。

延迟队列还有其他几种方式实现:
1、Java 的 DelayQueue,本质上就是一个优先级队列,
PriorityQueue内部是一个二叉小顶堆实现,其特点就是头部元素对应的权值是队列中最小的,也就是通过poll()方法获取到的对象是最优先的。缺点
第一点加入延迟队列的消息,每次系统重启都需要从数据库获取数据初始化
第二点DelayQueue实现了BlockingQueue接口,是一个阻塞队列。但该队列只是在取对象时阻塞,作为一个公共资源,要考虑好锁竞争的风险。
第三点需要自己开发线程作为队列的监听器,需要开发定时线程对队列获取过期的消息,并且获取到后需要根据需求新增线程去对获取到的大量消息进行处理。
这里面就已经涉及到了两套要开发的线程池。
第四点,数据保持在内存中,但却不会有太多的操作,在大数据消息的情况下,要么选择将队列抽成一个微服务,要么就选择合适的中间件。
rabbimq的优点就体现出来了,消息本身与业务系统隔离,DelayQueue的多线程处理被多个消费者取代功能。
实际上延迟的时间是延迟时间+处理时间,所有要调控好。

Docker部署

docker ps -a
4.进入RabittMQ查看版本
5.下载延迟队列插件
6.导入到容器
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 容器名字:/opt/rabbitmq/plugins
7.进入
docker exec -it 容器名字 /bin/bash
8.查看插件
cd /opt/rabbitmq/plugins
8.安装
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
9.查看插件
rabbitmq-plugins list
10.重启
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

粤ICP备2022112743号 粤公网安备 44010502002407号