Spring Cloud Alibaba 七天训练营(六)分布式消息(事件)驱动

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
应用实时监控服务-应用监控,每月50GB免费额度
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: 对文档有任何问题,请在评论区留言!
文档目录

1. 简介

事件驱动架构(Event-driven 架构,简称 EDA)是软件设计领域内的一套程序设计模型。这套模型的意义是所有的操作通过事件的发送/接收来完成。举个例子,比如一个订单的创建在传统软件设计中服务端通过接口暴露创建订单的动作,然后客户端访问创建订单。在事件驱动设计里,订单的创建通过接收订单事件来完成,这个过程中有事件发送者和事件接受者这两个模块,事件发送者的作用是发送订单事件,事件接受者的作用的接收订单事件。Spring Cloud Stream 是一套基于消息的事件驱动开发框架,它提供了一套全新的消息编程模型,此模型屏蔽了底层具体消息中间件的使用方式。开发者们使用这套模型可以完成基于消息的事件驱动应用开发。

2. 学习目标

  • 掌握 Spring 对消息的编程模型封装
  • 掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的发送和接收
  • 掌握 RocketMQ 整合 Spring Cloud Bus 完成远程事件的发送和接收

3. 详细内容

  • 概念理解:指导读者理解 Spring 的消息编程模型
  • 消息发送/接收:实战 Spring Cloud Steam RocketMQ Binder
  • 事件发送/接收: 实战 Spring Cloud Bus RocketMQ

4. 理解 Spring 消息编程模型

首先我们来看这个场景,不同的消息中间件发送消息的代码:

1.png


每个消息中间件都有自己的消息模型编程,他们的代码编写方式都不一致。同样地,在消息的订阅方面,也是不同的代码。这个时候如果某天想把 Kafka 切换到 RocketMQ,必须得修改大量代码。

Spring 生态里有两个消息相关的模块和项目,分别是 spring-messaging 模块和 Spring Integration 项目,它们对消息的编程模型进行了统一,不论是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被统一称为 org.springframework.messaging.Message 接口。

Message 接口有两个方法,分别是 getPayload 以及 getHeaders 用于获取消息体以及消息头。如图所示,这也意味着一个消息 Message 由 Header 和 Payload 组成:

2.png

Payload 是一个泛型,意味是消息体可以放任意数据类型。Header 是一个 MessageHeaders 类型的消息头。

有了消息之后,这个消息被发送到哪里呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被发送到消息通道里,然后再通过消息处理器 MessageHandler 去处理消息通道里的消息:

3.png

消息处理这里又会遇到一个问题。如果消息通道里只有 1 个消息,但是消息处理器有 N 个,这个时候要被哪个消息处理器处理呢?这里又涉及一个消息分发器的问题。UnicastingDispatcher 表示单播的处理方式,消息会通过负载均衡被分发到某一个消息处理器上,BroadcastingDispatcher 表示广播的方式,消息会被所有的消息处理器处理。

4.png

5. Spring Cloud Stream

Spring Cloud Stream 是一套基于消息的事件驱动开发框架。

Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:

5.png


如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 [4 ]()行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。

6.png

以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:

@SpringBootApplication@EnableBinding({Source.class, Sink.class})  // ①
public class SCSApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(SCSApplication.class)
            .web(WebApplicationType.NONE).run(args);
    }
    @Autowired
    Source source;  // ②
    @Bean
    public CommandLineRunner runner() {
        return (args) -> {
            source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build());  // ③
        };
    }
    @StreamListener(Sink.INPUT)  // ④
    @SendTo(Source.OUTPUT)  // ⑤
    public String receive(String msg) {
        return msg.toUpperCase();
    }
}
  1. 使用 [@EnableBinding ] 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 [@Output ] 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 [@Input ] 注解修饰表示这是一个 Input Binding。[@EnableBinding ] 注解会针对这两个接口生成动态代理。
  2. 注入 [@EnableBinding ]注解对于 Source 接口生成的动态代理。
  3. 使用 [@EnableBinding ] 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。
  4. [@StreamListener ] 注解订阅 [@EnableBinding ]注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的topic 和 group。
  5. 消息处理结果发送到[@EnableBinding ]注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。

上述代码需要配置信息:

spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka

spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq

这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。

所以这段代码的意思是以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。

当然,你也可以直接通过沙箱环境直接查看案例

相关文章
|
22天前
|
人工智能 Java Nacos
基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
本文将针对 Spring AI Alibaba + Nacos 的分布式多智能体构建方案展开介绍,同时结合 Demo 说明快速开发方法与实际效果。
1108 49
|
23天前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
2093 39
|
3月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
589 3
|
26天前
|
负载均衡 Java API
《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
|
26天前
|
人工智能 监控 Java
Spring AI Alibaba实践|后台定时Agent
基于Spring AI Alibaba框架,可构建自主运行的AI Agent,突破传统Chat模式限制,支持定时任务、事件响应与人工协同,实现数据采集、分析到决策的自动化闭环,提升企业智能化效率。
Spring AI Alibaba实践|后台定时Agent
|
2月前
|
人工智能 Java 机器人
基于Spring AI Alibaba + Spring Boot + Ollama搭建本地AI对话机器人API
Spring AI Alibaba集成Ollama,基于Java构建本地大模型应用,支持流式对话、knife4j接口可视化,实现高隐私、免API密钥的离线AI服务。
1376 1
基于Spring AI Alibaba + Spring Boot + Ollama搭建本地AI对话机器人API
|
3月前
|
人工智能 Java 开发者
邀您参与 “直通乌镇” Spring AI Alibaba 开源竞技挑战赛!
邀您参与 “直通乌镇” Spring AI Alibaba 开源竞技挑战赛!
|
4月前
|
人工智能 数据可视化 Java
性能提升 10 倍, DIFY 模式迁移至 Spring AI Alibaba 模式 零改造实现
将 Dify 应用迁移至 Spring AI Alibaba,可兼顾可视化开发效率与代码工程灵活性,显著提升系统性能与扩展能力,适用于复杂 AI 业务场景。
619 0