Redis搭配生产者消费者模型

Redis:👴说了👴只是缓存!

Posted by MatthewHan on 2019-11-29

前言

Redis作为一款优秀的缓存中间件,人们总是寄予他新的厚望。其列表类型的阻塞操作可以实现消息队列。

在场景中使用可以牢记以下口诀:

  • lpush + lpop = Stack(栈)

  • lpush + rpop = Queue(队列)

  • lpsh + ltrim = Capped Collection(有限集合)

  • lpush + brpop = Message Queue(消息队列)

背景

假如有这样的一个需求,现有一套「较老」的公共服务可供你的系统使用,但是目前还有其他业务单位正在使用。每次请求调用下发一个任务,接着服务会响应一个回调接口,通过这个回调接口,你可以查询这个任务的执行状态,到了什么阶段,是否完成之类的。任务从下发到结束需要一定的时间,并且任务结束总是以两种状态出现:Completed或者Error

假设这套公共服务分配给你了N个容量可以同时进行任务的调度,也就是说你系统调用下发的任务只能是N,当超过N时,就需要自定义一个队列来进行排队。当有任务完成时,任务执行池就可以释放一个空位,队列就可以pop出一个消息用于处理调用公共服务。

分析

为了充分深入Redis的列表的使用,我打算把所有的需求点和场景都交给Redis去完成。

所以这个需求看起来比较简单,就变成了也有麻烦的地方,主要是四个:

  1. 由于其他业务单位的存在,本系统需要有一个缓冲池和任务执行池Running Pool,容量为可分配的N。
  2. 业务量增加超过N时,需要有个等待队列维护出入:当目前任务执行池Running Pool满载时,入队;当任务Running Pool中的任务执行完成释放空位时,则出队进入Running Pool
  3. 「实际处理」的任务并不是执行在本系统而是在公共服务上,也就是说任何状态只能被动地通过公共服务的回调接口去查。
  4. 当存在N个正在执行的任务时,单线程肯定是效率不够的。需要开启多线程去回调公共服务的接口判断任务状态是否完成/失败,用于下一步操作出入队列。

设计

  1. 我们可以在Redis中规划一块任务执行池Running Pool,可以是Set类型也可以是List类型,容量为N。设计这样的一个队列有个好处是如果单纯的PUSH/POP的话,当出队之后处理这个消息的过程中发生不可抗力、宕机,消息出队之后就会永远的丢失掉,而这样做则是消息始终持久化在Redis中,是任务结束之后出队;
  2. 有一个等待队列Pending Queue用于缓冲,消费者需要处理上面和这一块的业务逻辑;

  3. 一个Completed Queue用于完成消息的推送;

  4. 一个Error Queue用于任务失败消息的推送(这两者其实可以合并,作为任务结束的消息便可,具体内容可以放在value中,也通过读写DataBase。

为了不出现当任务队列中没有任务时,消费者每秒都会调用一次POP命令查看是否有新任务这种情况,需要在消费者中同时处理业务逻辑,当任务执行缓冲池出队时,把Pending Queue的消息出队,入队到Running Pool

所以我们需要一个生产者RedisProducer(部分代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Transactional(rollbackFor = Exception.class)
public ProduceDTO produceTask(String orderId) {
/*
* 数据处理代码段...
* 部分业务逻辑...
*/

// 判断running queue是否还有空位
boolean condition = !redisBase.hasKey(RUNNING_QUEUE) || redisBase.lGetListSize(RUNNING_QUEUE) < RUNNING_QUEUE_SIZE;
// 如果有空位
if (condition) {
redisBase.lSet(RUNNING_QUEUE, orderId, -1, TimeUnit.DAYS);
// 开启服务
logger.info("================================开启转码服务================================");

/*
* 任务订单持久化代码段...
* 部分业务逻辑...
*/
boolean isUpdate = iTaskInfoService.modifyTaskByOrderId(produceDTO);
if (isUpdate) {
// 调用消费者
redisConsumer.consumerMessageThread(produceDTO);
} else {
throw new MatthewHanException(ServiceEnum.FIRST_UPDATE_TRANS_TASK_ERROR.getCode(), ServiceEnum.FIRST_UPDATE_TRANS_TASK_ERROR.getMessage());
}
return produceDTO;
} else {
// 没空位,则消息先进入pending queue
redisBase.lSet(PENDING_QUEUE, orderId, -1, TimeUnit.DAYS);
}
return produceDTO;
}

此部分主要是先判断Redis的Running Pool是否还有空余,无空余则进入Pending Queue等待。进入了Running Pool的消息做DataBase的持久化业务逻辑。这一句redisConsumer.consumerMessageThread(produceDTO);则是主动调用消费者。