JAVA技术 十一月 29, 2019

Redis搭配生产者消费者模型

文章字数 4.8k 阅读约需 4 mins. 阅读次数 0

前言

Redis作为一款优秀的缓存中间件,人们总是寄予他新的厚望。其列表类型的阻塞操作可以实现消息队列。

在场景中使用可以牢记以下口诀:

  • lpush + lpop = Stack(栈)

  • lpush + rpop = Queue(队列)

  • lpsh + ltrim = Capped Collection(有限集合)

  • lpush + brpop = Message Queue(消息队列)

背景

假如有这样的一个需求,现有一套「较老」的公共服务可供你的系统使用,但是目前还有其他业务单位正在使用。每次请求调用下发一个任务,接着服务会响应一个回调接口,通过这个回调接口,你可以查询这个任务的执行状态,到了什么阶段,是否完成之类的。任务从下发到结束需要一定的时间,并且任务结束总是以两种状态出现:Completed或者Error

假设这套公共服务分配给你了N个容量可以同时进行任务的调度,也就是说你系统调用下发的任务只能是N,当超过N时,就需要自定义一个队列来进行排队。当有任务完成时,任务执行池就可以释放一个空位,队列就可以pop出一个消息用于处理调用公共服务。

分析

为了充分深入Redis的列表的使用,我打算把所有的需求点和场景都交给Redis去完成。

所以这个需求看起来比较简单,就变成了也有麻烦的地方,主要是四个:

  1. 由于其他业务单位的存在,本系统需要有一个缓冲池和任务执行池Running Pool,容量为可分配的N。
  2. 业务量增加超过N时,需要有个等待队列维护出入:当目前任务执行池Running Pool满时,入队;当任务Running Pool中的任务执行完成释放空位时,则出队进入Running Pool
  3. 「实际处理」的任务并不是在本系统而是在公共服务上,也就是说任何状态只能被动地通过回调接口去查。
  4. 当存在N个正在执行的任务时,单线程肯定是效率不够的。需要开启多线程去回调接口判断任务状态是否完成/失败,用于下一步操作出队。

设计

  1. 我们可以在Redis中规划一块任务执行池Running Pool,可以是Set类型也可以是List类型,容量为N。设计这样的一个队列有个好处是如果单纯的PUSH/POP的话,当出队之后处理这个消息的过程中发生不可抗力、宕机,消息出队之后就会就会永远的丢失掉,而这样做则是消息始终持久化在Redis中,是任务结束之后;

  2. 有一个等待队列Pending Queue用于缓冲,消费者需要处理上面和这一块的业务逻辑;

  3. 一个Completed Queue用于完成消息的推送;

  4. 一个Error Queue用于任务失败消息的推送(这两者其实可以合并,作为任务结束的消息便可,具体内容可以放在value中,也通过读写DataBase。

为了不出现当任务队列中没有任务时,消费者每秒都会调用一次POP命令查看是否有新任务这种情况,需要在消费者中同时处理业务逻辑,当任务执行缓冲池出队时,把Pending Queue的消息出队,入队到Running Pool

所以我们需要一个生产者RedisProducer(部分代码)

@Transactional(rollbackFor = Exception.class)
public ProduceDTO produceTask(String orderId) {
    /* 
     * 数据处理代码段...
     * 部分业务逻辑...
     */
      
    // 判断running queue是否还有空位
    boolean condition = !redisBase.hasKey(RUNNING_QUEUE) || redisBase.lGetListSize(RUNNING_QUEUE) < RUNNING_QUEUE_SIZE;
    // 如果有空位
    if (condition) {
        redisBase.lSet(RUNNING_QUEUE, orderId, -1, TimeUnit.DAYS);
        // 开启服务
        logger.info("================================开启转码服务================================");

        /* 
         * 任务订单持久化代码段...
         * 部分业务逻辑... 
         */
         boolean isUpdate = iTaskInfoService.modifyTaskByOrderId(produceDTO);
         if (isUpdate) {
             // 调用消费者
             redisConsumer.consumerMessageThread(produceDTO);
         } else {
             throw new MatthewHanException(ServiceEnum.FIRST_UPDATE_TRANS_TASK_ERROR.getCode(), ServiceEnum.FIRST_UPDATE_TRANS_TASK_ERROR.getMessage());
         }
            return produceDTO;
    } else {
        // 没空位,则消息先进入pending queue
        redisBase.lSet(PENDING_QUEUE, orderId, -1, TimeUnit.DAYS);
    }
    return produceDTO;
}

此部分主要是先判断Redis的Running Pool是否还有空余,无空余则进入Pending Queue等待。进入了Running Pool的消息做DataBase的持久化业务逻辑。这一句redisConsumer.consumerMessageThread(produceDTO);则是主动调用消费者。

0%