大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


消费者的基本流程

消费者的参数、参数补充

Kafka 消息发送(Message Production)

在 Kafka 中,消息发送是指生产者将数据写入 Kafka 主题的过程。生产者是负责创建和发送消息的客户端应用,它们将数据转换为 Kafka 可识别的格式并发送到指定的主题中。


消息发送的过程

消息创建:生产者创建消息,包括主题名称、键(可选)、消息体等。键用于控制消息的分区,而消息体是实际的业务数据。

序列化:在消息发送之前,生产者需要将消息键和消息体序列化为字节数组,Kafka 只能处理字节数组格式的数据。

选择分区:消息被序列化后,生产者根据某种逻辑(如默认的哈希算法或自定义逻辑)将消息分配到某个特定的分区。

发送消息:消息被发送到 Kafka 集群的指定分区。Kafka 的 Broker 接收到消息后,会将其写入相应分区的日志文件中。

发送消息的配置参数

acks:定义生产者需要等待多少个副本确认消息已经收到,才认为消息发送成功。常见的值包括 0(不等待)、1(等待 Leader 确认)、all(等待所有副本确认)。

retries:当消息发送失败时,生产者重试的次数。

batch.size:生产者在发送消息前积累的消息批次大小。批次越大,吞吐量越高,但也会增加延迟。

自定义序列化器(Custom Serializer)

在 Kafka 中,生产者发送的消息需要先经过序列化处理。Kafka 提供了默认的序列化器(如 StringSerializer、ByteArraySerializer 等),但在某些情况下,可能需要自定义序列化器以支持特定的数据格式或优化性能。


什么是序列化器

序列化器的作用:序列化器将生产者的消息对象(如字符串、Java 对象等)转换为字节数组,以便 Kafka 能够存储和传输数据。

Kafka 的默认序列化器:Kafka 提供了多种默认序列化器来处理常见的数据类型,如字符串、整数和字节数组。

自定义序列化器的场景

复杂数据结构:当你的消息是复杂的对象结构(如嵌套的 JSON 对象、ProtoBuf 等),默认的序列化器可能无法满足需求。这时可以编写自定义序列化器,来处理这些复杂的结构。

性能优化:在一些高性能场景下,默认的序列化器可能无法满足低延迟、高吞吐量的需求。通过定制化的序列化器,可以优化序列化过程的效率。

自定义分区器(Custom Partitioner)

在 Kafka 中,分区器决定了消息被发送到哪个分区。Kafka 提供了默认的分区器(通常基于消息的键进行哈希计算),但在一些场景下,你可能希望自定义分区逻辑,以实现特定的消息分布策略。


分区器的作用

控制消息的分区:分区器的主要作用是根据消息的键或其他属性来确定消息应该发送到哪个分区。默认情况下,Kafka 使用键的哈希值来确定分区。

分区的意义:通过合理分配分区,可以实现消息的负载均衡、提高系统的并行处理能力,并确保相同键的消息总是被发送到同一个分区。

自定义分区器的场景

定制化的消息分布:在某些场景下,可能需要根据业务逻辑将消息定向到特定的分区。例如,按照用户 ID 分区、按照消息类型分区等。

特殊的分区需求:某些情况下,你可能希望确保某些分区具有更高的优先级或更大的存储能力,这时可以使用自定义分区器来实现这些需求。

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。

序列化器作用就是用于序列化要发送的消息的。

Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {

    private String username;

    private String password;

    private Integer age;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, User data) {
        if (null == data) {
            return null;
        }
        int userId = data.getUserId();
        String username = data.getUsername();
        String password = data.getPassword();
        int age = data.getAge();

        int usernameLen = 0;
        byte[] usernameBytes;
        if (null != username) {
            usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            usernameLen = usernameBytes.length;
        } else {
            usernameBytes = new byte[0];

        }

        int passwordLen = 0;
        byte[] passwordBytes;
        if (null != password) {
            passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            passwordLen = passwordBytes.length;
        } else {
            passwordBytes = new byte[0];
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);
        byteBuffer.putInt(userId);
        byteBuffer.putInt(usernameLen);
        byteBuffer.put(usernameBytes);
        byteBuffer.putInt(passwordLen);
        byteBuffer.put(passwordBytes);
        byteBuffer.putInt(age);
        return byteBuffer.array();
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

分区器

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer

可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
目录
相关文章
|
2月前
|
Java API 数据处理
Java新特性:使用Stream API重构你的数据处理
Java新特性:使用Stream API重构你的数据处理
Java API 开发者
78 0
|
4月前
|
并行计算 Java API
Java List 集合结合 Java 17 新特性与现代开发实践的深度解析及实战指南 Java List 集合
本文深入解析Java 17中List集合的现代用法,结合函数式编程、Stream API、密封类、模式匹配等新特性,通过实操案例讲解数据处理、并行计算、响应式编程等场景下的高级应用,帮助开发者提升集合操作效率与代码质量。
186 1
|
4月前
|
安全 Java 微服务
Java 最新技术和框架实操:涵盖 JDK 21 新特性与 Spring Security 6.x 安全框架搭建
本文系统整理了Java最新技术与主流框架实操内容,涵盖Java 17+新特性(如模式匹配、文本块、记录类)、Spring Boot 3微服务开发、响应式编程(WebFlux)、容器化部署(Docker+K8s)、测试与CI/CD实践,附完整代码示例和学习资源推荐,助你构建现代Java全栈开发能力。
471 0
|
4月前
|
缓存 安全 Java
Java 并发新特性实战教程之核心特性详解与项目实战
本教程深入解析Java 8至Java 19并发编程新特性,涵盖CompletableFuture异步编程、StampedLock读写锁、Flow API响应式流、VarHandle内存访问及结构化并发等核心技术。结合电商订单处理、缓存系统、实时数据流、高性能计数器与用户资料聚合等实战案例,帮助开发者高效构建高并发、低延迟、易维护的Java应用。适合中高级Java开发者提升并发编程能力。
114 0
|
4月前
|
安全 Java API
Java 17 及以上版本核心特性在现代开发实践中的深度应用与高效实践方法 Java 开发实践
本项目以“学生成绩管理系统”为例,深入实践Java 17+核心特性与现代开发技术。采用Spring Boot 3.1、WebFlux、R2DBC等构建响应式应用,结合Record类、模式匹配、Stream优化等新特性提升代码质量。涵盖容器化部署(Docker)、自动化测试、性能优化及安全加固,全面展示Java最新技术在实际项目中的应用,助力开发者掌握现代化Java开发方法。
169 1
|
4月前
|
IDE Java API
Java 17 新特性与微服务开发的实操指南
本内容涵盖Java 11至Java 17最新特性实战,包括var关键字、字符串增强、模块化系统、Stream API、异步编程、密封类等,并提供图书管理系统实战项目,帮助开发者掌握现代Java开发技巧与工具。
232 1
|
4月前
|
Java 数据库连接 API
Java 8 + 特性及 Spring Boot 与 Hibernate 等最新技术的实操内容详解
本内容涵盖Java 8+核心语法、Spring Boot与Hibernate实操,按考试考点分类整理,含技术详解与代码示例,助力掌握最新Java技术与应用。
136 2