RocketMQ延时消息的原理与实现

简介: 本文分享了RocketMQ的延时消息的原理和实现,手把手带你从源码角度了解到内部实现机制。

前面给大家分享了Rocket大体架构设计和Spring快速集成RocketMQ。看了前面的文章的小伙伴把RocketMQ集成进项目以及发送消息和消费消息问题应该不大,有问题的可以私信我一起学习解决问题。

今天给大家梳理下RocketMQ的延时消息如何使用以及如何实现的,包括我的一些改进想法,是不是有点飘了居然想修改RocketMQ的源码。

延时消息基本概念

延时消息:顾名思义就是消息不是实时处理的,可以在延时设置时候后消息才能被消费者消费。

以下使用场景不一定使用延时消息是最好的方案,但延时消息是适用于以下场景的:

1、30分钟取消订单,商城用户下单后如果在设定时候后还没支付就将订单状态置为取消,并恢复商品库存

2、超时自动审批,有的系统审批流程可以设置为超过设定时间后自动执行通过或者拒绝流程

3、短信或提醒,比如注册账号后三天不登陆就发短信提醒

RocketMq如何使用延时消息

Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 设置延时消息的级别
msg.setDelayTimeLevel(2);

是不是发现了RocketMQ发送延时消息非常简单,只需要在消息上设置delayTimeLevel属性即可。但是看属性的字面意思可以发现这个属性值好像不是具体多少秒,感觉像是一个等级的意思,那这个延时时间等级是怎么样个效果呢?请接着往下看

RocketMQ延时消息等级

RocketMQ的延时消息其实并非是精确的一个时间,而是采用延时等级来定义的。在MessageStoreConfig类上有对这个延时消息等级的定义,定义了18个等级最低1s,最长2h

// RocketMQ延时消息的定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

我们看看RocketMQ是如何使用这18个等级来实现延时消息的。

Broker初始化流程-延时消息部分

在Broker初始化流程中有这样的逻辑,流程如下

解析18个时间等级

这里解析了18个等级为秒然后放入到delayLevelTable中存储起来。

Broker启动流程-延时消息读部分

接着来看看延时任务在Broker启动的时候怎么创建的,可以看到每个等级都创建了一个定时任务,初始设定都是秒调度执行一次

每个等级创建一个定时任务

那关键执行逻辑就是DeliverDelayedMessageTimerTask的run方法了

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

//有这样的一个代码,可以看到RocketMQ把延时消息都放到了SCHEDULE_TOPIC_XXXX队列按照level分到了不同的queue。这个Topic是内部写死的
ConsumeQueue cq =
             ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//然后从这个对象中根据offset来取buffer,主要是用来判断当前offset的消息时间戳和当前时间戳作对比
  
  // 在executeOnTimeup这个方法的中间部分
 long now = System.currentTimeMillis();
 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 long countdown = deliverTimestamp - now;
 if (countdown <= 0) {
        // 如果时间戳比当前时间戳小证明消息到期了
   
   // 会调用这个方法把真实的Topic设置到消息上
   # DeliverDelayedMessageTimerTask#messageTimeup
   ...
   // 组装真实的topic消息
   MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
   
   // 写到真实的消息commitLog中
   PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
 } else {
   // 如果没到期则进行下一轮检测
 }

延时消息写入commitLog流程

当我们需要发送延时消息时我们会在message上设置setDelayTimeLevel(), 那从上面读取部分我们反推下写入流程,应该是在存储时判断消息体上是否有延时消息的level属性,如果有就应该把消息放在Topic为SCHEDULE_TOPIC_XXXX的队列中,我们来看看是否是我们猜想的呢?

当Broker启动时会作为服务器启动netty,当有接到消息时会触发NettyServerHandler,跟踪下发送消息的流程

org.apache.rocketmq.store.CommitLog#asyncPutMessage

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    if (msg.getDelayTimeLevel() > 0) {
        
                // 这里将topic设置为SCHEDULE_TOPIC_XXXX
        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        // queueId设置为 msg.getDelayTimeLevel()-1
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                
        // Backup real topic, queueId
        // 将真实的topic存储起来,以便在消息到期时替换为真实的topic发送到对应的队列上去
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

小结

从上面的代码我们可以梳理出RocketMQ为我们内置了18个延时,同时在启动Broker时启动18个定时任务分别扫描各自等级的消息逻辑。当需要延时消息时可在发送时在消息上指定等级,消息到达Broker后将topic替换为SCHEDULE_TOPIC_XXXX,将真实的topic存放在propertis中。当定时器定时扫描各自等级的队列,如果根据offset取出有消息并判断时间戳消息当前时间戳就将topic替换为真实topic发送的对应topic的队列中。

从上面的逻辑我们可以了解到开源版RocketMQ的延时消息其实并不是精准延时的。商业版的RocketMQ是阿里云的ons,ons是能满足精准延时的。滴滴也基于开源版的RocketMQ开发出了精准延时的模块,有兴趣的可以去了解下 DDMQ

本次分享就到这了,关于commitLog本篇文章梳理的不是很清楚的,下次咱们专门针对RocketMQ的存储再梳理下,好了本篇关于开源版RocketMQ的延时消息实现梳理对你有帮助可以关注下我 Java极客帮

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
7月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
2412 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
7月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
345 11
RocketMQ原理—3.源码设计简单分析下
|
7月前
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
7月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
12月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
11月前
|
消息中间件 中间件 Kafka
MQ四兄弟:如何实现延时消息
本文介绍了几种常见的消息队列系统(RabbitMQ、RocketMQ、Kafka和Pulsar)实现延时消息的方式。RabbitMQ通过死信队列或延时插件实现;RocketMQ内置延时消息支持,可通过设置`delayTimeLevel`属性实现;Kafka不直接支持延时消息,但可以通过时间戳、延时Topic、Kafka Streams等方法间接实现;Pulsar自带延时消息功能,提供`deliverAfter`和`deliverAt`两种方式。每种方案各有优劣,适用于不同的应用场景。
1102 0
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。

热门文章

最新文章