Login
网站首页 > 文章中心 > 其它

用Redis实现延迟队列,我研究了两种方案,发现并不简单

作者:小编 更新时间:2023-09-30 14:16:41 浏览量:313人看过

大家好,我是三友~~

前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了.

虽然基于MQ这个方式走不通了,但是这个项目中使用到Redis,所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究对比了这两个方案,发现要想很好的实现延迟队列,并不简单.

基于监听过期key的方式来实现延迟队列是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获

一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:

用Redis实现延迟队列,我研究了两种方案,发现并不简单

图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic.

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息.

在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身.当消费者监听这些channel时,就可以感知到Redis中数据的变化.

这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知.

这些默认的channel被分为两类:

以__keyspace@__:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件.

举个例子,现在有个消费者监听了__keyspace@0__:sanyou这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息

以__keyevent@__:为前缀,后面跟的是消息事件类型,表示监听某个事件

同样举个例子,现在有个消费者监听了__keyevent@0__:expired这个channel,代表了监听key的过期事件.那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息.如果把expired换成del,那么监听的就是删除事件.具体支持哪些事件,可从官网查.

用Redis实现延迟队列,我研究了两种方案,发现并不简单

通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到__keyevent@__:expired这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能.

所以这种方式实现延迟队列就只需要两步:

发送延迟任务,key是延迟消息本身,过期时间就是延迟时间

监听__keyevent@__:expired这个channel,处理延迟任务

好了,基本概念和核心原理都说完了之后,又到了show me the code环节.

好巧不巧,Spring已经实现了监听__keyevent@*__:expired这个channel这个功能,__keyevent@*__:expired中的*代表通配符的意思,监听所有的数据库.

引入pom

org.springframework.boot

spring-boot-starter-data-redis

spring-boot-starter-web

配置类

public?class?RedisConfiguration?{

@Bean

public?RedisMessageListenerContainer?redisMessageListenerContainer(RedisConnectionFactory?connectionFactory)?{

RedisMessageListenerContainer?redisMessageListenerContainer?=?new?RedisMessageListenerContainer();

redisMessageListenerContainer.setConnectionFactory(connectionFactory);

return?redisMessageListenerContainer;

}

public?KeyExpirationEventMessageListener?redisKeyExpirationListener(RedisMessageListenerContainer?redisMessageListenerContainer)?{

return?new?KeyExpirationEventMessageListener(redisMessageListenerContainer);

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听

用Redis实现延迟队列,我研究了两种方案,发现并不简单

当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件

用Redis实现延迟队列,我研究了两种方案,发现并不简单

所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息.

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

public?class?MyRedisKeyExpiredEventListener?implements?ApplicationListener?{

@Override

public?void?onApplicationEvent(RedisKeyExpiredEvent?event)?{

byte[]?body?=?event.getSource();

System.out.println("获取到延迟消息:"?◆?new?String(body));

整个工程目录也简单

用Redis实现延迟队列,我研究了两种方案,发现并不简单

代码写好,启动应用

为什么会没打印出?难道是代码写错了?正当我准备检查代码的时候,官网的一段话道出了真实原因.

用Redis实现延迟队列,我研究了两种方案,发现并不简单

我给大家翻译一下上面这段话讲的内容.

上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种:

惰性清除.当这个key过期之后,访问时,这个Key才会被清除

定时清除.后台会定期检查一部分key,如果有key过期了,就会被清除

再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布.

除了上面测试demo的时候遇到的坑之外,在我深入研究之后,还发现了一些更离谱的坑.

丢消息太频繁

Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息.

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了.

消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种.

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息.

用Redis实现延迟队列,我研究了两种方案,发现并不简单

如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息.

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量.

接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题.

当消费者监听了以__keyevent@__:开头的消息,那么会导致所有的key发生了事件都会被通知给消费者.

举个例子,某个消费者监听了__keyevent@*__:expired这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到.

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务.

所以,综上能够得出一个非常重要的结论,那就是监听Redis过期Key这种方式实现延迟队列,不稳定,坑贼多!

那有没有比较靠谱的延迟队列的实现方案呢?这就不得不提到我研究的第二种方案了.

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能.

先来个demo,后面再来说说这种实现的原理.

org.redisson

redisson

封装了一个RedissonDelayQueue类

public?class?RedissonDelayQueue?{

private?RedissonClient?redissonClient;

private?RDelayedQueue?delayQueue;

private?RBlockingQueue?blockingQueue;

@PostConstruct

public?void?init()?{

initDelayQueue();

startDelayQueueConsumer();

private?void?initDelayQueue()?{

Config?config?=?new?Config();

SingleServerConfig?serverConfig?=?config.useSingleServer();

redissonClient?=?Redisson.create(config);

blockingQueue?=?redissonClient.getBlockingQueue("SANYOU");

delayQueue?=?redissonClient.getDelayedQueue(blockingQueue);

private?void?startDelayQueueConsumer()?{

{

while?(true)?{

try?{

String?task?=?blockingQueue.take();

log.info("接收到延迟任务:{}",?task);

}?catch?(Exception?e)?{

e.printStackTrace();

},?"SANYOU-Consumer").start();

public?void?offerTask(String?task,?long?seconds)?{

log.info("添加延迟任务:{}?延迟时间:{}s",?task,?seconds);

delayQueue.offer(task,?seconds,?TimeUnit.SECONDS);

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓.

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务.

添加任务的时候是通过RDelayedQueue的offer方法添加的.

public?class?RedissonDelayQueueController?{

@Resource

private?RedissonDelayQueue?redissonDelayQueue;

@GetMapping("/add")

public?void?addTask(@RequestParam("task")?String?task)?{

启动项目,在浏览器输入如下连接,添加任务

用Redis实现延迟队列,我研究了两种方案,发现并不简单

如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型

用Redis实现延迟队列,我研究了两种方案,发现并不简单

SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀.

redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 ◆ 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要

redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用..

SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的

有了这些概念之后,再来看看整体的运行原理图

用Redis实现延迟队列,我研究了两种方案,发现并不简单

用Redis实现延迟队列,我研究了两种方案,发现并不简单

这段lua脚本主要干了两件事:

将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列

获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中

此处可以等待10s,好好想想..

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列.

这里再补充两个特殊情况,图中没有画出:

第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了.

添加任务代码如下,也是通过lua脚本来的

用Redis实现延迟队列,我研究了两种方案,发现并不简单

现在来比较一下第一种方案和Redisson的这种方案,看看有没有第一种方案的那些坑.

第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟.

第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据.

第四个问题是Redis内部channel发布事件的问题,跟这种方案不沾边,就更不可能存在了.

所以,通过上面的对比可以看出,Redisson这种实现方案就显得更加的靠谱了.

往期热门文章推荐

扒一扒Bean注入到Spring的那些姿势

三万字盘点Spring/Boot的那些常用扩展点

两万字盘点那些被玩烂了的设计模式

RocketMQ保姆级教程

RocketMQ消息短暂而又精彩的一生

扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题.

用Redis实现延迟队列,我研究了两种方案,发现并不简单

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章