RocketMQ msgId与offsetMsgId释疑(实战篇)

简介: RocketMQ msgId与offsetMsgId释疑(实战篇)

本文将详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。


1、抛出问题


1.1 从消息发送看消息ID


package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
    public static void main(String[] args)  {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
}

执行效果如图所示:

ff4bf2af1697b54e3e0846625e9b5828.png

即消息发送会返回 msgId 与 offsetMsgId。


1.2 从消息消费看消息ID


package org.apache.rocketmq.example.quickstart;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.println("MessageExt msg.getMsgId():" +  msgs.get(0).getMsgId());
                System.out.println("-------------------分割线-----------------");
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

执行效果如图所示:

56320901ea29fab53077b9803e618403.jpg

不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的msgId 与直接输出msgs中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。


2、消息ID释疑


从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回 msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?


  • msgId:该 ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。
  • offsetMsgId:消息偏移ID,该 ID 记录了消息所在集群的物理地址,主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。


2.1 msgId 即全局唯一 ID 构建规则


dc0b4f3f0131426adadf39b92dcca639.jpg

从这张图可以看出,msgId确实是客户端生成的,接下来我们详细分析一下其生成算法。


MessageClientIDSetter#createUniqID

public static String createUniqID() {
    StringBuilder sb = new StringBuilder(LEN * 2);
    sb.append(FIX_STRING);    // @1
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));  // @2
    return sb.toString();
}

一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。


2.1.1 FIX_STRING


MessageClientIDSetter静态代码块

static {
    byte[] ip;
    try {
        ip = UtilAll.getIP();
    } catch (Exception e) {
        ip = createFakeIP();
    }
    LEN = ip.length + 2 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
    tempBuffer.position(0);
    tempBuffer.put(ip);
    tempBuffer.position(ip.length);
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(ip.length + 2);
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}

从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。


2.1.2 唯一性算法


msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。

private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}

可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。


2.2 offsetMsgId构建规则



984b1ff5bee4812606b6348b0b2594a3.jpg

在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:


MessageDecoder#createMessageId

public static String createMessageId(final ByteBuffer input ,
            final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}

首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:


  • ByteBuffer input
    用来存放 offsetMsgId 的字节缓存区( NIO 相关的基础知识)
  • ByteBuffer addr
    当前 Broker 服务器的 IP 地址与端口号,即通过解析 offsetMsgId 从而得到消息服务器的地址信息。
  • long offset
    消息的物理偏移量。
    即构成 offsetMsgId 的组成部分:Broker 服务器的 IP 与端口号、消息的物理偏移量。

温馨提示:即在 RocketMQ 中,只需要提供 offsetMsgId,可以不必知道该消息所属的 topic 信息即可查询该条消息的内容。


2.3 消息发送与消息消费返回的消息ID信息


消息发送时会在 SendSesult 中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。


在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。


在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?


d990da8bd559dd9b106281e673cb883a.jpg

在客户端返回的 msg 信息,其最终返回的对象是  MessageClientExt ,继承自 MessageExt。


那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。

@Override
public String getMsgId() {
    String uniqID = MessageClientIDSetter.getUniqID(this);
    if (uniqID == null) {
        return this.getOffsetMsgId();
    } else {
        return uniqID;
    }
}

原来在调用 MessageClientExt 中的 getMsgId 方法时,如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。


而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:

39c6f29fb6549ddc2a1c9953d7e09ca6.png


故返回的是 MessageExt中 的 msgId,该 msgId 存放的是 offsetMsgId,所以才造成了困扰。


温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发送变化的,但该消息的 offsetMsgId 会发送变化,因为其存储在服务器中的位置发生了变化。

3、实践经验


在解答了消息发送与消息消费关于msgId与offsetMsgId的困扰后,再来介绍一下如果根据 msgId 去查询消息。


想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?

8eedf355a618d0de65e00467aeafb03a.jpg

这里的 Message ID 返回的是消息的全局唯一ID。


其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:

e32b6db9a829aca3c6dd8e449ef81f27.jpg

发现报错,那我们将 offsetMsgId 传入其执行效果如图所示:

65dd03fedcb6483b21c5254457b33f3f.jpg

但在 rocketmq-console 的根据消息ID去查找消息,无论传入哪个msgId,下图该功能都能返回正确的结果:

df15d0a3b8c1fb737bc4adefec5b050e.png


这是因为 rocketmq-console 做了兼容,首先将传入的 msgId 用 queryMsgById 该命令去查,如果报错,则当成 uniqID(全局ID)去查,首先全局ID会存储在消息的属性中,并会创建 Hash 索引,即可用通过 indexfile 快速定位到该条消息。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
514 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
2月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
828 1
|
7月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
7月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
7月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
5月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
3656 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
7月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
7月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
7月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
7月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。