消息中间件-RocketMQ技术(一)

简介: 消息中间件-RocketMQ技术(一)

一、为什么我要写RocketMQ呢?

a、公司用的原因:因为刚到新的公司,后期我会结合公司的用到的情况,把涉

及到线上的关于RocketMQ的问题的经验分享大家。

b、自己也想买了一些书关于RocketMQ的书籍,到时总结出来写成文章。

c、听说以前叮咚买菜的公司的中间件是叮咚买菜的CTO用c语言写的,虽然性能挺强,但是好像不能够通用的原因,后期给pass了。现在转为阿里出的RocketMQ。

二、如何去研究一款陌生的中间件系统

研究一款开源中间件,首先我们需要了解它的整体的架构以及如何在开发环境调试源码,从代码入手才能快速熟悉一个开源项目,只有这样才能够抽丝剥茧地理解透彻,了解作者的设计思想和实现原理。

三、如何去获取和调试RocketMQ的源代码

我这边不用eclipse开发工具来搭建RocketMQ源代码了,直接上IntelliJ IDEA。

step1:

在Intellij  IDEA  VCS 菜单中选择 Check from. Version Control,再选择Git,然后

弹出对话框,如下图所示:

step2:

我是基于maven的方式来构建源代码的,所以之后build下就可以了,然后就成了如下图中的样子:

四、调试RocketMQ源码

1、启动NameServer

step1:

需要在如下图中创建一个ROCKETMQ_HOME的RocketMQ的运行主目录:

step2:

在RocketMQ运行的主目录中创建conf,logs,store三个文件夹。

step3:

从RocketMQ distribution部署目录中将broker.conf,logback_broker.xml,logback_namesrv.xml文件复制到自己创建的conf目录下。然后把broker.conf文件内容做下修改如下:

然后另外两个文件只需要修改对应的路径为RocketMQ的运行的主目录就ok了。

2、启动Broker

step1:

需要配置-c属性指定broker配置文件路径,以及RocketMQ主目录,如下图所示:

3、分别启动nameserver和broker,nameserver看控制台,broker看日志是否报错,如下图

上面的步骤都成功的话就代表RocketMQ在本地部署好了源码

五、体验一把发送消息的生产者和消费消息的消费者的代码吧

step1:

生产者的代码如下:


/**

* 实例生产着

*/

public class Producer {

   public static void main(String[] args) throws MQClientException, InterruptedException {

       DefaultMQProducer producer = new DefaultMQProducer("pgroup");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       for (int i = 0; i < 1000; i++) {

           try {

               Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes());

               SendResult result = producer.send(msg);

               System.out.println(result);

           } catch (Exception e) {

               e.printStackTrace();

               Thread.sleep(100);

           }

       }

       producer.shutdown();

   }

}

step2:

消费者的代码如下:

/**

* 消费者

*/

public class Consumer {

   public static void main(String[] args) throws InterruptedException, MQClientException {

       DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xx-consumer");

       consumer.setNamesrvAddr("127.0.0.1:9876");

       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest","*");

       consumer.registerMessageListener(new MessageListenerConcurrently() {

           @Override

           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

               System.out.printf("%s Receive New " +

                      "Messages: %s %n", Thread.currentThread().getName(), msgs);

               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

           }

       });

       consumer.start();

 }

}

step3:

生产者运行的结果如下,我贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。

SendResult [sendStatus=SEND_OK, msgId=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B4A03D9, offsetMsgId=C0A82B0900002A9F000000000006DB6E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=496]

消费端运行的结果如下,这个我也贴出一小部分,因为各个线程去消费的结果都是一样的结构,只是值不同。

ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=226, queueOffset=485, sysFlag=0, bornTimestamp=1618421391164, bornHost=/192.168.43.9:50693, storeTimestamp=1618421391164, storeHost=/192.168.43.9:10911, msgId=C0A82B0900002A9F000000000006B578, commitLogOffset=439672, bodyCRC=185152384, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1618421582967, UNIQ_KEY=2409891E92603FCD047CA03AE754E85F077618B4AAC2486B4B3C03AE, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 57, 52, 50], transactionId='null'}]]

六、RocketMQ的设计理念

1、设计理念如下:

RocketMQ的核心功能:消息发送,消息存储,消息消费。整体设计追求简

单和性能非常高的理念,主要体现如下几个方面:

a、NameServer设计简单

nameserver用来实现元数据的管理,因为Topic路由信息无需在集群之间保持强一致性,最终一致性就可以了。所以nameserver集群之间互不通信。降低了nameserver的复杂度以及对网络的要求也降低了不少。

b、高效的IO存储机制

存储文件设置成文件组,组内单个文件的大小固定,方便引入内存映射机制

主题的消息是顺序写的,提升消息的写性能

引入消息队列文件和索引文件来兼顾消息消费和消息查找

c、容忍设计上的缺陷

RocketMQ设计者的难题:不能同时保证消息一定能被消息消费者消费,并且再保证只消费一次。而是只保证消息被消费者消费,但是设计上是允许消息被重复消费的。这样简化了消息中间件的内核,而且消息发送高可用变得非常简单与高效 ,消息重复问题需要在消费时实现幂等就可以了。


明天继续~~

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
204 6
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
7月前
|
SQL 大数据 数据库
RocketMQ实战—1.订单系统面临的技术挑战
本文详细分析了一个订单系统的设计与技术挑战。首先,介绍了订单系统的整体架构、业务流程及负载情况,包括电商购物流程、核心和非核心业务流程,以及真实生产中的负载压力。接着,探讨了系统面临的主要技术问题:支付后发券、发红包等操作导致性能下降;退款流程复杂且易失败;与第三方系统耦合带来的不稳定;大数据团队直接查询数据库影响性能;秒杀活动时数据库压力剧增等。最后,通过放大100倍压力的方法,梳理了高并发下的技术挑战,如核心链路优化、后台线程补偿机制、第三方系统解耦、数据获取方式改进等,为订单系统的优化提供了全面的参考。
RocketMQ实战—1.订单系统面临的技术挑战
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
550 2
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
12月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
1082 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
4月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
185 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
823 99

热门文章

最新文章